From 6544b7eb493a5b150692ef01325752145845267e Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 10 Jun 2014 19:14:32 -0400 Subject: [PATCH 01/28] [SPARK-2065] give launched instances names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This update gives launched EC2 instances descriptive names by using instance tags. Launched instances now show up in the EC2 console with these names. I used `format()` with named parameters, which I believe is the recommended practice for string formatting in Python, but which doesn’t seem to be used elsewhere in the script. --- ec2/spark_ec2.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 9d5748ba4bc23..4a12bdd2c9a81 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -418,6 +418,12 @@ def launch_cluster(conn, opts, cluster_name): master_nodes = master_res.instances print "Launched master in %s, regid = %s" % (zone, master_res.id) + # Give the instances descriptive names + for master in master_nodes: + master.add_tag(key='Name', value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + for slave in slave_nodes: + slave.add_tag(key='Name', value='spark-{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + # Return all the instances return (master_nodes, slave_nodes) From 2627247506bb3b5c3d7509081825a0d6c718895e Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 10 Jun 2014 22:25:28 -0400 Subject: [PATCH 02/28] broke up lines before they hit 100 chars --- ec2/spark_ec2.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 4a12bdd2c9a81..6ce531e5054d8 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -420,9 +420,13 @@ def launch_cluster(conn, opts, cluster_name): # Give the instances descriptive names for master in master_nodes: - master.add_tag(key='Name', value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + master.add_tag( + key='Name', + value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) for slave in slave_nodes: - slave.add_tag(key='Name', value='spark-{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + slave.add_tag( + key='Name', + value='spark-{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) # Return all the instances return (master_nodes, slave_nodes) From 69f6e222ad4c02ea0fb117d6a70a720ef2d4fa59 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 10 Jun 2014 23:03:03 -0400 Subject: [PATCH 03/28] PEP8 fixes --- ec2/spark_ec2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6ce531e5054d8..52a89cb2481ca 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -200,6 +200,7 @@ def get_spark_shark_version(opts): sys.exit(1) return (version, spark_shark_map[version]) + # Attempt to resolve an appropriate AMI given the architecture and # region of the request. def get_spark_ami(opts): @@ -421,11 +422,11 @@ def launch_cluster(conn, opts, cluster_name): # Give the instances descriptive names for master in master_nodes: master.add_tag( - key='Name', + key='Name', value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) for slave in slave_nodes: slave.add_tag( - key='Name', + key='Name', value='spark-{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) # Return all the instances From a36eed0b5772f90be9f11fceddf1c6a5618450ff Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 9 Jul 2014 10:50:07 -0400 Subject: [PATCH 04/28] name ec2 instances and security groups consistently MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Security groups created by spark-ec2 do not prepend “spark-“ to the name. Since naming the instances themselves is new to spark-ec2, it’s better to change that pattern to match the existing naming pattern for the security groups, rather than the other way around. --- ec2/spark_ec2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index f5c2bfb697c81..64b31b9d2e620 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -428,11 +428,11 @@ def launch_cluster(conn, opts, cluster_name): for master in master_nodes: master.add_tag( key='Name', - value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) for slave in slave_nodes: slave.add_tag( key='Name', - value='spark-{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) # Return all the instances return (master_nodes, slave_nodes) From f7e45813a3a58ba369d5f21595f671c84f5b91ff Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 9 Jul 2014 14:26:17 -0400 Subject: [PATCH 05/28] unrelated pep8 fix Functions in Python should be preceded by 2 blank lines, not 1. --- ec2/spark_ec2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 64b31b9d2e620..44775ea479ece 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -699,6 +699,7 @@ def ssh(host, opts, command): time.sleep(30) tries = tries + 1 + # Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990) def _check_output(*popenargs, **kwargs): if 'stdout' in kwargs: From f0a7ebf8700ca13a43edda2a41921e317ebd8dfc Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 14:43:08 -0400 Subject: [PATCH 06/28] [SPARK-2470] PEP8 fixes to rddsampler.py --- python/pyspark/rddsampler.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 122bc38b03b0c..7ff1c316c7623 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -18,13 +18,16 @@ import sys import random + class RDDSampler(object): def __init__(self, withReplacement, fraction, seed=None): try: import numpy self._use_numpy = True except ImportError: - print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling." + print >> sys.stderr, ( + "NumPy does not appear to be installed. " + "Falling back to default random generator for sampling.") self._use_numpy = False self._seed = seed if seed is not None else random.randint(0, sys.maxint) @@ -61,7 +64,7 @@ def getUniformSample(self, split): def getPoissonSample(self, split, mean): if not self._rand_initialized or split != self._split: self.initRandomGenerator(split) - + if self._use_numpy: return self._random.poisson(mean) else: @@ -80,30 +83,27 @@ def getPoissonSample(self, split, mean): num_arrivals += 1 return (num_arrivals - 1) - + def shuffle(self, vals): if self._random is None: self.initRandomGenerator(0) # this should only ever called on the master so # the split does not matter - + if self._use_numpy: self._random.shuffle(vals) else: self._random.shuffle(vals, self._random.random) def func(self, split, iterator): - if self._withReplacement: + if self._withReplacement: for obj in iterator: - # For large datasets, the expected number of occurrences of each element in a sample with - # replacement is Poisson(frac). We use that to get a count for each element. - count = self.getPoissonSample(split, mean = self._fraction) + # For large datasets, the expected number of occurrences of each element in + # a sample with replacement is Poisson(frac). We use that to get a count for + # each element. + count = self.getPoissonSample(split, mean=self._fraction) for _ in range(0, count): yield obj else: for obj in iterator: if self.getUniformSample(split) <= self._fraction: yield obj - - - - From a6d5e4b049a5bbc3dd1eb46611c535f5cb9dbb8c Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 15:53:48 -0400 Subject: [PATCH 07/28] [SPARK-2470] PEP8 fixes to cloudpickle.py --- python/pyspark/cloudpickle.py | 305 +++++++++++++++++++--------------- 1 file changed, 167 insertions(+), 138 deletions(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 4fda2a9b950b8..caa2baa32a364 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -55,7 +55,7 @@ import dis import traceback -#relevant opcodes +# relevant opcodes STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL')) DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL')) LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL')) @@ -70,7 +70,11 @@ try: import ctypes except (MemoryError, ImportError): - logging.warning('Exception raised on importing ctypes. Likely python bug.. some functionality will be disabled', exc_info = True) + logging.warning( + ('Exception raised on importing ctypes. Likely python bug.. ' + 'some functionality will be disabled'), + exc_info=True + ) ctypes = None PyObject_HEAD = None else: @@ -87,9 +91,11 @@ except ImportError: from StringIO import StringIO + # These helper functions were copied from PiCloud's util module. def islambda(func): - return getattr(func,'func_name') == '' + return getattr(func, 'func_name') == '' + def xrange_params(xrangeobj): """Returns a 3 element tuple describing the xrange start, step, and len @@ -102,31 +108,32 @@ def xrange_params(xrangeobj): """ xrange_len = len(xrangeobj) - if not xrange_len: #empty - return (0,1,0) + if not xrange_len: # empty + return (0, 1, 0) start = xrangeobj[0] - if xrange_len == 1: #one element + if xrange_len == 1: # one element return start, 1, 1 return (start, xrangeobj[1] - xrangeobj[0], xrange_len) -#debug variables intended for developer use: +# debug variables intended for developer use: printSerialization = False printMemoization = False -useForcedImports = True #Should I use forced imports for tracking? - +useForcedImports = True # Should I use forced imports for tracking? class CloudPickler(pickle.Pickler): dispatch = pickle.Pickler.dispatch.copy() savedForceImports = False - savedDjangoEnv = False #hack tro transport django environment + savedDjangoEnv = False # hack tro transport django environment - def __init__(self, file, protocol=None, min_size_to_save= 0): - pickle.Pickler.__init__(self,file,protocol) - self.modules = set() #set of modules needed to depickle - self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env + def __init__(self, file, protocol=None, min_size_to_save=0): + pickle.Pickler.__init__(self, file, protocol) + self.modules = set() # set of modules needed to depickle + # map ids to dictionary. used to ensure that + # functions can share global env + self.globals_ref = {} def dump(self, obj): # note: not thread safe @@ -150,59 +157,57 @@ def dump(self, obj): def save_buffer(self, obj): """Fallback to save_string""" - pickle.Pickler.save_string(self,str(obj)) + pickle.Pickler.save_string(self, str(obj)) dispatch[buffer] = save_buffer - #block broken objects + # block broken objects def save_unsupported(self, obj, pack=None): raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj)) dispatch[types.GeneratorType] = save_unsupported - #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it + # python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it try: - slice(0,1).__reduce__() - except TypeError: #can't pickle - + slice(0, 1).__reduce__() + except TypeError: # can't pickle - dispatch[slice] = save_unsupported - #itertools objects do not pickle! + # itertools objects do not pickle! for v in itertools.__dict__.values(): if type(v) is type: dispatch[v] = save_unsupported - def save_dict(self, obj): """hack fix If the dict is a global, deal with it in a special way """ - #print 'saving', obj + # print 'saving', obj if obj is __builtins__: self.save_reduce(_get_module_builtins, (), obj=obj) else: pickle.Pickler.save_dict(self, obj) dispatch[pickle.DictionaryType] = save_dict - def save_module(self, obj, pack=struct.pack): """ Save a module as an import """ - #print 'try save import', obj.__name__ + # print 'try save import', obj.__name__ self.modules.add(obj) - self.save_reduce(subimport,(obj.__name__,), obj=obj) - dispatch[types.ModuleType] = save_module #new type + self.save_reduce(subimport, (obj.__name__,), obj=obj) + dispatch[types.ModuleType] = save_module # new type def save_codeobject(self, obj, pack=struct.pack): """ Save a code object """ - #print 'try to save codeobj: ', obj + # print 'try to save codeobj: ', obj args = ( obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars ) self.save_reduce(types.CodeType, args, obj=obj) - dispatch[types.CodeType] = save_codeobject #new type + dispatch[types.CodeType] = save_codeobject # new type def save_function(self, obj, name=None, pack=struct.pack): """ Registered with the dispatch to handle all function types. @@ -214,10 +219,12 @@ def save_function(self, obj, name=None, pack=struct.pack): name = obj.__name__ modname = pickle.whichmodule(obj, name) - #print 'which gives %s %s %s' % (modname, obj, name) + # print 'which gives %s %s %s' % (modname, obj, name) try: themodule = sys.modules[modname] - except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__ + except KeyError: + # eval'd items such as namedtuple give invalid items + # for their function __module__ modname = '__main__' if modname == '__main__': @@ -227,28 +234,29 @@ def save_function(self, obj, name=None, pack=struct.pack): self.modules.add(themodule) if not self.savedDjangoEnv: - #hack for django - if we detect the settings module, we transport it + # hack for django - if we detect the settings module, we transport it django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '') if django_settings: django_mod = sys.modules.get(django_settings) if django_mod: - cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name) + cloudLog.debug( + 'Transporting django settings %s during save of %s', + django_mod, name) self.savedDjangoEnv = True self.modules.add(django_mod) write(pickle.MARK) self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod) write(pickle.POP_MARK) - # if func is lambda, def'ed at prompt, is in main, or is nested, then # we'll pickle the actual function object rather than simply saving a # reference (as is done in default pickler), via save_function_tuple. if islambda(obj) or obj.func_code.co_filename == '' or themodule is None: - #Force server to import modules that have been imported in main + # Force server to import modules that have been imported in main modList = None if themodule is None and not self.savedForceImports: mainmod = sys.modules['__main__'] - if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'): + if useForcedImports and hasattr(mainmod, '___pyc_forcedImports__'): modList = list(mainmod.___pyc_forcedImports__) self.savedForceImports = True self.save_function_tuple(obj, modList) @@ -290,12 +298,12 @@ def save_function_tuple(self, func, forced_imports): if forced_imports: write(pickle.MARK) save(_modules_to_main) - #print 'forced imports are', forced_imports + # print 'forced imports are', forced_imports forced_names = map(lambda m: m.__name__, forced_imports) save((forced_names,)) - #save((forced_imports,)) + # save((forced_imports,)) write(pickle.REDUCE) write(pickle.POP_MARK) @@ -342,7 +350,7 @@ def extract_code_globals(co): extended_arg = oparg*65536L if op in GLOBAL_OPS: out_names.add(names[oparg]) - #print 'extracted', out_names, ' from ', names + # print 'extracted', out_names, ' from ', names return out_names def extract_func_data(self, func): @@ -356,13 +364,14 @@ def extract_func_data(self, func): func_global_refs = CloudPickler.extract_code_globals(code) if code.co_consts: # see if nested function have any global refs for const in code.co_consts: - if type(const) is types.CodeType and const.co_names: - func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const)) + if isinstance(const, types.CodeType) and const.co_names: + func_global_refs = func_global_refs.union( + CloudPickler.extract_code_globals(const)) # process all variables referenced by global environment f_globals = {} for var in func_global_refs: - #Some names, such as class functions are not global - we don't need them - if func.func_globals.has_key(var): + # Some names, such as class functions are not global - we don't need them + if var in func.func_globals: f_globals[var] = func.func_globals[var] # defaults requires no processing @@ -371,9 +380,10 @@ def extract_func_data(self, func): def get_contents(cell): try: return cell.cell_contents - except ValueError, e: #cell is empty error on not yet assigned - raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope') - + except ValueError, e: # cell is empty error on not yet assigned + raise pickle.PicklingError( + ('Function to be pickled has free variables that are referenced before ' + 'assignment in enclosing scope')) # process closure if func.func_closure: @@ -385,7 +395,7 @@ def get_contents(cell): dct = func.func_dict if printSerialization: - outvars = ['code: ' + str(code) ] + outvars = ['code: ' + str(code)] outvars.append('globals: ' + str(f_globals)) outvars.append('defaults: ' + str(defaults)) outvars.append('closure: ' + str(closure)) @@ -410,7 +420,7 @@ def save_global(self, obj, name=None, pack=struct.pack): try: __import__(modname) themodule = sys.modules[modname] - except (ImportError, KeyError, AttributeError): #should never occur + except (ImportError, KeyError, AttributeError): # should never occur raise pickle.PicklingError( "Can't pickle %r: Module %s cannot be found" % (obj, modname)) @@ -423,46 +433,48 @@ def save_global(self, obj, name=None, pack=struct.pack): sendRef = True typ = type(obj) - #print 'saving', obj, typ + # print 'saving', obj, typ try: - try: #Deal with case when getattribute fails with exceptions + try: # Deal with case when getattribute fails with exceptions klass = getattr(themodule, name) except (AttributeError): - if modname == '__builtin__': #new.* are misrepeported + if modname == '__builtin__': # new.* are misrepeported modname = 'new' __import__(modname) themodule = sys.modules[modname] try: klass = getattr(themodule, name) except AttributeError, a: - #print themodule, name, obj, type(obj) + # print themodule, name, obj, type(obj) raise pickle.PicklingError("Can't pickle builtin %s" % obj) else: raise except (ImportError, KeyError, AttributeError): - if typ == types.TypeType or typ == types.ClassType: + if isinstance(obj, types.TypeType) or isinstance(obj, types.ClassType): sendRef = False - else: #we can't deal with this + else: # we can't deal with this raise else: - if klass is not obj and (typ == types.TypeType or typ == types.ClassType): + if klass is not obj and + (isinstance(obj, types.TypeType) or isinstance(obj, types.ClassType)): sendRef = False if not sendRef: - #note: Third party types might crash this - add better checks! - d = dict(obj.__dict__) #copy dict proxy to a dict - if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties - d.pop('__dict__',None) - d.pop('__weakref__',None) + # note: Third party types might crash this - add better checks! + d = dict(obj.__dict__) # copy dict proxy to a dict + + # don't extract dict that are properties + if not isinstance(d.get('__dict__', None), property): + d.pop('__dict__', None) + d.pop('__weakref__', None) # hack as __new__ is stored differently in the __dict__ new_override = d.get('__new__', None) if new_override: d['__new__'] = obj.__new__ - self.save_reduce(type(obj),(obj.__name__,obj.__bases__, - d),obj=obj) - #print 'internal reduce dask %s %s' % (obj, d) + self.save_reduce(type(obj), (obj.__name__, obj.__bases__, d), obj=obj) + # print 'internal reduce dask %s %s' % (obj, d) return if self.proto >= 2: @@ -472,7 +484,7 @@ def save_global(self, obj, name=None, pack=struct.pack): if code <= 0xff: write(pickle.EXT1 + chr(code)) elif code <= 0xffff: - write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8)) + write("%c%c%c" % (pickle.EXT2, code & 0xff, code >> 8)) else: write(pickle.EXT4 + pack("= 2 and getattr(func, "__name__", "") == "__newobj__": - #Added fix to allow transient + # Added fix to allow transient cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( @@ -596,7 +606,7 @@ def save_reduce(self, func, args, state=None, args = args[1:] save(cls) - #Don't pickle transient entries + # Don't pickle transient entries if hasattr(obj, '__transient__'): transient = obj.__transient__ state = state.copy() @@ -627,46 +637,44 @@ def save_reduce(self, func, args, state=None, self._batch_setitems(dictitems) if state is not None: - #print 'obj %s has state %s' % (obj, state) + # print 'obj %s has state %s' % (obj, state) save(state) write(pickle.BUILD) - def save_xrange(self, obj): """Save an xrange object in python 2.5 Python 2.6 supports this natively """ range_params = xrange_params(obj) - self.save_reduce(_build_xrange,range_params) + self.save_reduce(_build_xrange, range_params) - #python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it + # python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it try: xrange(0).__reduce__() - except TypeError: #can't pickle -- use PiCloud pickler + except TypeError: # can't pickle -- use PiCloud pickler dispatch[xrange] = save_xrange def save_partial(self, obj): """Partial objects do not serialize correctly in python2.x -- this fixes the bugs""" self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords)) - if sys.version_info < (2,7): #2.7 supports partial pickling + if sys.version_info < (2, 7): # 2.7 supports partial pickling dispatch[partial] = save_partial - def save_file(self, obj): """Save a file""" - import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute + import StringIO as pystringIO # we can't use cStringIO as it lacks the name attribute from ..transport.adapter import SerializingAdapter - if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): + if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") if obj.name == '': - return self.save_reduce(getattr, (sys,'stdout'), obj=obj) + return self.save_reduce(getattr, (sys, 'stdout'), obj=obj) if obj.name == '': - return self.save_reduce(getattr, (sys,'stderr'), obj=obj) + return self.save_reduce(getattr, (sys, 'stderr'), obj=obj) if obj.name == '': raise pickle.PicklingError("Cannot pickle standard input") - if hasattr(obj, 'isatty') and obj.isatty(): + if hasattr(obj, 'isatty') and obj.isatty(): raise pickle.PicklingError("Cannot pickle files that map to tty objects") if 'r' not in obj.mode: raise pickle.PicklingError("Cannot pickle files that are not opened for reading") @@ -677,10 +685,10 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be stat" % name) if obj.closed: - #create an empty closed string io + # create an empty closed string io retval = pystringIO.StringIO("") retval.close() - elif not fsize: #empty file + elif not fsize: # empty file retval = pystringIO.StringIO("") try: tmpfile = file(name) @@ -689,10 +697,13 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) tmpfile.close() if tst != '': - raise pickle.PicklingError("Cannot pickle file %s as it does not appear to map to a physical, real file" % name) + raise pickle.PicklingError( + ("Cannot pickle file %s as it does not appear to map to " + "a physical, real file") % name) elif fsize > SerializingAdapter.max_transmit_data: - raise pickle.PicklingError("Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % - (name,SerializingAdapter.max_transmit_data)) + raise pickle.PicklingError( + "Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % + (name, SerializingAdapter.max_transmit_data)) else: try: tmpfile = file(name) @@ -705,7 +716,7 @@ def save_file(self, obj): retval.seek(curloc) retval.name = name - self.save(retval) #save stringIO + self.save(retval) # save stringIO self.memoize(obj) dispatch[file] = save_file @@ -718,6 +729,7 @@ def inject_numpy(self): self.dispatch[numpy.ufunc] = self.__class__.save_ufunc numpy_tst_mods = ['numpy', 'scipy.special'] + def save_ufunc(self, obj): """Hack function for saving numpy ufunc objects""" name = obj.__name__ @@ -727,7 +739,8 @@ def save_ufunc(self, obj): if name in tst_mod.__dict__: self.save_reduce(_getobject, (tst_mod_name, name)) return - raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' % str(obj)) + raise pickle.PicklingError( + 'cannot save %s. Cannot resolve what module it is defined in' % str(obj)) def inject_timeseries(self): """Handle bugs with pickling scikits timeseries""" @@ -741,20 +754,22 @@ def save_timeseries(self, obj): func, reduce_args, state = obj.__reduce__() if func != ts._tsreconstruct: - raise pickle.PicklingError('timeseries using unexpected reconstruction function %s' % str(func)) - state = (1, - obj.shape, - obj.dtype, - obj.flags.fnc, - obj._data.tostring(), - ts.getmaskarray(obj).tostring(), - obj._fill_value, - obj._dates.shape, - obj._dates.__array__().tostring(), - obj._dates.dtype, #added -- preserve type - obj.freq, - obj._optinfo, - ) + raise pickle.PicklingError( + 'timeseries using unexpected reconstruction function %s' % str(func)) + state = ( + 1, + obj.shape, + obj.dtype, + obj.flags.fnc, + obj._data.tostring(), + ts.getmaskarray(obj).tostring(), + obj._fill_value, + obj._dates.shape, + obj._dates.__array__().tostring(), + obj._dates.dtype, # added -- preserve type + obj.freq, + obj._optinfo, + ) return self.save_reduce(_genTimeSeries, (reduce_args, state)) def inject_email(self): @@ -772,12 +787,12 @@ def inject_addons(self): """Python Imaging Library""" def save_image(self, obj): - if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \ - and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): - #if image not loaded yet -- lazy load - self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj) + if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name and + not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): + # if image not loaded yet -- lazy load + self.save_reduce(_lazyloadImage, (obj.fp,), obj=obj) else: - #image is loaded - just transmit it over + # image is loaded - just transmit it over self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj) """ @@ -788,34 +803,35 @@ def memoize(self, obj): """ - # Shorthands for legacy support def dump(obj, file, protocol=2): CloudPickler(file, protocol).dump(obj) + def dumps(obj, protocol=2): file = StringIO() - cp = CloudPickler(file,protocol) + cp = CloudPickler(file, protocol) cp.dump(obj) - #print 'cloud dumped', str(obj), str(cp.modules) + # print 'cloud dumped', str(obj), str(cp.modules) return file.getvalue() -#hack for __import__ not working as desired +# hack for __import__ not working as desired def subimport(name): __import__(name) return sys.modules[name] -#hack to load django settings: + +# hack to load django settings: def django_settings_load(name): modified_env = False if 'DJANGO_SETTINGS_MODULE' not in os.environ: - os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps + os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps modified_env = True try: module = subimport(name) @@ -825,24 +841,28 @@ def django_settings_load(name): if modified_env: del os.environ['DJANGO_SETTINGS_MODULE'] else: - #add project directory to sys,path: - if hasattr(module,'__file__'): + # add project directory to sys,path: + if hasattr(module, '__file__'): dirname = os.path.split(module.__file__)[0] + '/' sys.path.append(dirname) + # restores function attributes def _restore_attr(obj, attr): for key, val in attr.items(): setattr(obj, key, val) return obj + def _get_module_builtins(): return pickle.__builtins__ + def print_exec(stream): ei = sys.exc_info() traceback.print_exception(ei[0], ei[1], ei[2], None, stream) + def _modules_to_main(modList): """Force every module in modList to be placed into main""" if not modList: @@ -853,22 +873,24 @@ def _modules_to_main(modList): if type(modname) is str: try: mod = __import__(modname) - except Exception, i: #catch all... + except Exception, i: # catch all... sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \ A version mismatch is likely. Specific error was:\n' % modname) print_exec(sys.stderr) else: - setattr(main,mod.__name__, mod) + setattr(main, mod.__name__, mod) else: - #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) - #In old version actual module was sent - setattr(main,modname.__name__, modname) + # REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) + # In old version actual module was sent + setattr(main, modname.__name__, modname) + -#object generators: +# object generators: def _build_xrange(start, step, len): """Built xrange explicitly""" return xrange(start, start + step*len, step) + def _genpartial(func, args, kwds): if not args: args = () @@ -892,12 +914,13 @@ def _fill_function(func, globals, defaults, closure, dict): return func -def _make_skel_func(code, num_closures, base_globals = None): + +def _make_skel_func(code, num_closures, base_globals=None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ - #build closure (cells): + # build closure (cells): if not ctypes: raise Exception('ctypes failed to import; cannot build function') @@ -925,17 +948,21 @@ def _make_skel_func(code, num_closures, base_globals = None): (), (), ('newval',), '', 'cell_changer', 1, '', ('c',), () ) + def _change_cell_value(cell, newval): """ Changes the contents of 'cell' object to newval """ return new.function(cell_changer_code, {}, None, (), (cell,))(newval) + """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" + def _getobject(modname, attribute): mod = __import__(modname, fromlist=[attribute]) return mod.__dict__[attribute] + def _generateImage(size, mode, str_rep): """Generate image from string representation""" import Image @@ -943,32 +970,34 @@ def _generateImage(size, mode, str_rep): i.fromstring(str_rep) return i + def _lazyloadImage(fp): import Image - fp.seek(0) #works in almost any case + fp.seek(0) # works in almost any case return Image.open(fp) + """Timeseries""" + + def _genTimeSeries(reduce_args, state): import scikits.timeseries.tseries as ts from numpy import ndarray from numpy.ma import MaskedArray - time_series = ts._tsreconstruct(*reduce_args) - #from setstate modified + # from setstate modified (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state - #print 'regenerating %s' % dtyp + # print 'regenerating %s' % dtyp MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv)) _dates = time_series._dates - #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ - ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm)) + # _dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ + ndarray.__setstate__(_dates, (dsh, dtyp, isf, dtm)) _dates.freq = frq _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None, toobj=None, toord=None, tostr=None)) # Update the _optinfo dictionary time_series._optinfo.update(infodict) return time_series - From f4e00399663b5e7e641fcfc42747dde38dcf4e76 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 15:53:59 -0400 Subject: [PATCH 08/28] [SPARK-2470] PEP8 fixes to conf.py --- python/pyspark/conf.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 60fc6ba7c52c2..8f75cd52a1378 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -50,7 +50,8 @@ spark.executorEnv.VAR4=value4 spark.home=/path >>> sorted(conf.getAll(), key=lambda p: p[0]) -[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] +[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'),\ +(u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] """ @@ -118,9 +119,9 @@ def setExecutorEnv(self, key=None, value=None, pairs=None): """Set an environment variable to be passed to executors.""" if (key is not None and pairs is not None) or (key is None and pairs is None): raise Exception("Either pass one key-value pair or a list of pairs") - elif key != None: + elif key is not None: self._jconf.setExecutorEnv(key, value) - elif pairs != None: + elif pairs is not None: for (k, v) in pairs: self._jconf.setExecutorEnv(k, v) return self @@ -137,7 +138,7 @@ def setAll(self, pairs): def get(self, key, defaultValue=None): """Get the configured value for some key, or return a default otherwise.""" - if defaultValue == None: # Py4J doesn't call the right get() if we pass None + if defaultValue is None: # Py4J doesn't call the right get() if we pass None if not self._jconf.contains(key): return None return self._jconf.get(key) From ca2d28b63776b1901afc255f81b59cd532f39fcb Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 15:54:16 -0400 Subject: [PATCH 09/28] [SPARK-2470] PEP8 fixes to context.py --- python/pyspark/context.py | 45 ++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 95c54e7a5ad63..c1c84d001fa96 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -29,7 +29,7 @@ from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ - PairDeserializer + PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd from pyspark.rdd import RDD @@ -50,12 +50,11 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() - _python_includes = None # zip and egg files that need to be added to PYTHONPATH - + _python_includes = None # zip and egg files that need to be added to PYTHONPATH def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, - gateway=None): + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, + gateway=None): """ Create a new SparkContext. At least the master and app name should be set, either through the named parameters here or through C{conf}. @@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._accumulatorServer = accumulators._start_update_server() (host, port) = self._accumulatorServer.server_address self._javaAccumulator = self._jsc.accumulator( - self._jvm.java.util.ArrayList(), - self._jvm.PythonAccumulatorParam(host, port)) + self._jvm.java.util.ArrayList(), + self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') @@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, (dirname, filename) = os.path.split(path) self._python_includes.append(filename) sys.path.append(path) - if not dirname in sys.path: + if dirname not in sys.path: sys.path.append(dirname) # Create a temporary directory inside spark.local.dir: @@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: - if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: + if SparkContext._active_spark_context and + SparkContext._active_spark_context != instance: currentMaster = SparkContext._active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite # Raise error if there is already a running Spark context - raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \ - " created by %s at %s:%s " \ - % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum)) + raise ValueError( + "Cannot run multiple SparkContexts at once; " + "existing SparkContext(app=%s, master=%s)" + " created by %s at %s:%s " + % (currentAppName, currentMaster, + callsite.function, callsite.file, callsite.linenum)) else: SparkContext._active_spark_context = instance @@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None): Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. - + >>> path = os.path.join(tempdir, "sample-text.txt") >>> with open(path, "w") as testFile: ... testFile.write("Hello world!") @@ -584,11 +587,12 @@ def addPyFile(self, path): HTTP, HTTPS or FTP URI. """ self.addFile(path) - (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): self._python_includes.append(filename) - sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode + # for tests in local mode + sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) def setCheckpointDir(self, dirName): """ @@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False): Cancelled If interruptOnCancel is set to true for the job group, then job cancellation will result - in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure - that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, - where HDFS may respond to Thread.interrupt() by marking nodes as dead. + in Thread.interrupt() being called on the job's executor threads. This is useful to help + ensure that the tasks are actually stopped in a timely manner, but is off by default due + to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead. """ self._jsc.setJobGroup(groupId, description, interruptOnCancel) @@ -688,7 +692,7 @@ def cancelAllJobs(self): """ self._jsc.sc().cancelAllJobs() - def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False): + def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): """ Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements. @@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False): >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True) [0, 1, 16, 25] """ - if partitions == None: + if partitions is None: partitions = range(rdd._jrdd.partitions().size()) javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) @@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def _test(): import atexit import doctest From 7fc849cd5bd01d2141b786e362419f59bc039b5a Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 15:54:28 -0400 Subject: [PATCH 10/28] [SPARK-2470] PEP8 fixes to daemon.py --- python/pyspark/daemon.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 5eb1c63bf206b..8a5873ded2b8b 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -42,12 +42,12 @@ def should_exit(): def compute_real_exit_code(exit_code): - # SystemExit's code can be integer or string, but os._exit only accepts integers - import numbers - if isinstance(exit_code, numbers.Integral): - return exit_code - else: - return 1 + # SystemExit's code can be integer or string, but os._exit only accepts integers + import numbers + if isinstance(exit_code, numbers.Integral): + return exit_code + else: + return 1 def worker(listen_sock): From 1bde2658a0aea55dad57b3ad2864ddd6fa363f78 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 15:54:42 -0400 Subject: [PATCH 11/28] [SPARK-2470] PEP8 fixes to java_gateway.py --- python/pyspark/java_gateway.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2a17127a7e0f9..2c129679f47f3 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -24,6 +24,7 @@ from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient + def launch_gateway(): SPARK_HOME = os.environ["SPARK_HOME"] From 81fcb2016367e4a0a00911ae71bbeede05f49e4e Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:15:53 -0400 Subject: [PATCH 12/28] [SPARK-2470] PEP8 fixes to resultiterable.py --- python/pyspark/resultiterable.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py index 7f418f8d2e29a..df34740fc8176 100644 --- a/python/pyspark/resultiterable.py +++ b/python/pyspark/resultiterable.py @@ -19,6 +19,7 @@ import collections + class ResultIterable(collections.Iterable): """ A special result iterable. This is used because the standard iterator can not be pickled @@ -27,7 +28,9 @@ def __init__(self, data): self.data = data self.index = 0 self.maxindex = len(data) + def __iter__(self): return iter(self.data) + def __len__(self): return len(self.data) From d14f2f18587ecb53a8c769a6ab36fa132e8d7361 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:04 -0400 Subject: [PATCH 13/28] [SPARK-2470] PEP8 fixes to __init__.py --- python/pyspark/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 07df8697bd1a8..312c75d112cbf 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -59,4 +59,5 @@ from pyspark.storagelevel import StorageLevel -__all__ = ["SparkConf", "SparkContext", "SQLContext", "RDD", "SchemaRDD", "SparkFiles", "StorageLevel", "Row"] +__all__ = ["SparkConf", "SparkContext", "SQLContext", "RDD", "SchemaRDD", + "SparkFiles", "StorageLevel", "Row"] From c85e1e506d291bbba4336419db1d9c1c9b57bb17 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:12 -0400 Subject: [PATCH 14/28] [SPARK-2470] PEP8 fixes to join.py --- python/pyspark/join.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 5f3a7e71f7866..b0f1cc1927066 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -33,10 +33,11 @@ from pyspark.resultiterable import ResultIterable + def _do_python_join(rdd, other, numPartitions, dispatch): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) - return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__())) + return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__())) def python_join(rdd, other, numPartitions): @@ -85,6 +86,7 @@ def make_mapper(i): vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)] union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds) rdd_len = len(vrdds) + def dispatch(seq): bufs = [[] for i in range(rdd_len)] for (n, v) in seq: From a0fec2e00723e4b79914ef106c4e9745fca88bbc Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:22 -0400 Subject: [PATCH 15/28] [SPARK-2470] PEP8 fixes to mllib --- python/pyspark/mllib/_common.py | 4 +++- python/pyspark/mllib/linalg.py | 1 + python/pyspark/mllib/util.py | 2 -- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e609b60a0f968..43b491a9716fc 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -164,7 +164,7 @@ def _deserialize_double_vector(ba, offset=0): nb = len(ba) - offset if nb < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % nb) + "which is too short" % nb) if ba[offset] == DENSE_VECTOR_MAGIC: return _deserialize_dense_vector(ba, offset) elif ba[offset] == SPARSE_VECTOR_MAGIC: @@ -272,6 +272,7 @@ def _serialize_labeled_point(p): header_float[0] = p.label return header + serialized_features + def _deserialize_labeled_point(ba, offset=0): """Deserialize a LabeledPoint from a mutually understood format.""" from pyspark.mllib.regression import LabeledPoint @@ -283,6 +284,7 @@ def _deserialize_labeled_point(ba, offset=0): features = _deserialize_double_vector(ba, offset + 9) return LabeledPoint(label, features) + def _copyto(array, buffer, offset, shape, dtype): """ Copy the contents of a vector to a destination bytearray at the diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index db39ed0acdb66..71f4ad1a8d44e 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -247,6 +247,7 @@ def stringify(vector): else: return "[" + ",".join([str(v) for v in vector]) + "]" + def _test(): import doctest (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index e24c144f458bd..a707a9dcd5b49 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -24,7 +24,6 @@ from pyspark.serializers import NoOpSerializer - class MLUtils: """ Helper methods to load, save and pre-process data used in MLlib. @@ -154,7 +153,6 @@ def saveAsLibSVMFile(data, dir): lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p)) lines.saveAsTextFile(dir) - @staticmethod def loadLabeledPoints(sc, path, minPartitions=None): """ From 95d1d95b29f2d7a525bac67ffdd1efe671216c62 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:30 -0400 Subject: [PATCH 16/28] [SPARK-2470] PEP8 fixes to serializers.py --- python/pyspark/serializers.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index b253807974a2e..f66045c12c367 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -91,7 +91,6 @@ def load_stream(self, stream): """ raise NotImplementedError - def _load_stream_without_unbatching(self, stream): return self.load_stream(stream) @@ -197,8 +196,8 @@ def _load_stream_without_unbatching(self, stream): return self.serializer.load_stream(stream) def __eq__(self, other): - return isinstance(other, BatchedSerializer) and \ - other.serializer == self.serializer + return isinstance(other, BatchedSerializer) and + other.serializer == self.serializer def __str__(self): return "BatchedSerializer<%s>" % str(self.serializer) @@ -229,8 +228,8 @@ def load_stream(self, stream): yield pair def __eq__(self, other): - return isinstance(other, CartesianDeserializer) and \ - self.key_ser == other.key_ser and self.val_ser == other.val_ser + return isinstance(other, CartesianDeserializer) and + self.key_ser == other.key_ser and self.val_ser == other.val_ser def __str__(self): return "CartesianDeserializer<%s, %s>" % \ @@ -252,18 +251,20 @@ def load_stream(self, stream): yield pair def __eq__(self, other): - return isinstance(other, PairDeserializer) and \ - self.key_ser == other.key_ser and self.val_ser == other.val_ser + return isinstance(other, PairDeserializer) and + self.key_ser == other.key_ser and self.val_ser == other.val_ser def __str__(self): - return "PairDeserializer<%s, %s>" % \ - (str(self.key_ser), str(self.val_ser)) + return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser)) class NoOpSerializer(FramedSerializer): - def loads(self, obj): return obj - def dumps(self, obj): return obj + def loads(self, obj): + return obj + + def dumps(self, obj): + return obj class PickleSerializer(FramedSerializer): @@ -276,12 +277,16 @@ class PickleSerializer(FramedSerializer): not be as fast as more specialized serializers. """ - def dumps(self, obj): return cPickle.dumps(obj, 2) + def dumps(self, obj): + return cPickle.dumps(obj, 2) + loads = cPickle.loads + class CloudPickleSerializer(PickleSerializer): - def dumps(self, obj): return cloudpickle.dumps(obj, 2) + def dumps(self, obj): + return cloudpickle.dumps(obj, 2) class MarshalSerializer(FramedSerializer): From 19168591a6cd88e0fdaa5bf11c2849bc104a7681 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:38 -0400 Subject: [PATCH 17/28] [SPARK-2470] PEP8 fixes to shell.py --- python/pyspark/shell.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 2ce5409cd67c2..f222330c48261 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -35,7 +35,8 @@ from pyspark.storagelevel import StorageLevel # this is the equivalent of ADD_JARS -add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") is not None else None +add_files = ( + os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") is not None else None) if os.environ.get("SPARK_EXECUTOR_URI"): SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) From aa3a7b64a8bbc23e077378bb1bd8c8bb89b7fae0 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:44 -0400 Subject: [PATCH 18/28] [SPARK-2470] PEP8 fixes to sql.py --- python/pyspark/sql.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index ffe177576f363..cb83e89176823 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -30,7 +30,7 @@ class SQLContext: tables, execute SQL over tables, cache tables, and read parquet files. """ - def __init__(self, sparkContext, sqlContext = None): + def __init__(self, sparkContext, sqlContext=None): """Create a new SQLContext. @param sparkContext: The SparkContext to wrap. @@ -137,7 +137,6 @@ def parquetFile(self, path): jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) - def jsonFile(self, path): """Loads a text file storing one JSON object per line, returning the result as a L{SchemaRDD}. @@ -234,8 +233,8 @@ def _ssql_ctx(self): self._scala_HiveContext = self._get_hive_ctx() return self._scala_HiveContext except Py4JError as e: - raise Exception("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run " \ - "sbt/sbt assembly" , e) + raise Exception("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run " + "sbt/sbt assembly", e) def _get_hive_ctx(self): return self._jvm.HiveContext(self._jsc.sc()) @@ -377,7 +376,7 @@ def registerAsTable(self, name): """ self._jschema_rdd.registerAsTable(name) - def insertInto(self, tableName, overwrite = False): + def insertInto(self, tableName, overwrite=False): """Inserts the contents of this SchemaRDD into the specified table. Optionally overwriting any existing data. @@ -420,7 +419,7 @@ def _toPython(self): # in Java land in the javaToPython function. May require a custom # pickle serializer in Pyrolite return RDD(jrdd, self._sc, BatchedSerializer( - PickleSerializer())).map(lambda d: Row(d)) + PickleSerializer())).map(lambda d: Row(d)) # We override the default cache/persist/checkpoint behavior as we want to cache the underlying # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class @@ -483,6 +482,7 @@ def subtract(self, other, numPartitions=None): else: raise ValueError("Can only subtract another SchemaRDD") + def _test(): import doctest from array import array @@ -493,20 +493,25 @@ def _test(): sc = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['sc'] = sc globs['sqlCtx'] = SQLContext(sc) - globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"}, - {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}', - '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}', - '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}'] + globs['rdd'] = sc.parallelize( + [{"field1": 1, "field2": "row1"}, + {"field1": 2, "field2": "row2"}, + {"field1": 3, "field2": "row3"}] + ) + jsonStrings = [ + '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', + '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}', + '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}' + ] globs['jsonStrings'] = jsonStrings globs['json'] = sc.parallelize(jsonStrings) globs['nestedRdd1'] = sc.parallelize([ - {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, - {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) + {"f1": array('i', [1, 2]), "f2": {"row1": 1.0}}, + {"f1": array('i', [2, 3]), "f2": {"row2": 2.0}}]) globs['nestedRdd2'] = sc.parallelize([ - {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, - {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]) - (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) + {"f1": [[1, 2], [2, 3]], "f2": set([1, 2]), "f3": (1, 2)}, + {"f1": [[2, 3], [3, 4]], "f2": set([2, 3]), "f3": (2, 3)}]) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) @@ -514,4 +519,3 @@ def _test(): if __name__ == "__main__": _test() - From d644477359cdf6bc1c77343dcfd4a3b818180122 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:51 -0400 Subject: [PATCH 19/28] [SPARK-2470] PEP8 fixes to worker.py --- python/pyspark/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index f43210c6c0301..24d41b12d1b1a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -57,8 +57,8 @@ def main(infile, outfile): SparkFiles._is_running_on_worker = True # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - sys.path.append(spark_files_dir) # *.py files that were added will be copied here - num_python_includes = read_int(infile) + sys.path.append(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile) sys.path.append(os.path.join(spark_files_dir, filename)) From b3b96cfcfc98013cc5d95886a8f9664f040443d6 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:16:59 -0400 Subject: [PATCH 20/28] [SPARK-2470] PEP8 fixes to statcounter.py --- python/pyspark/statcounter.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 080325061a697..6bdf6db9eca49 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -20,18 +20,19 @@ import copy import math + class StatCounter(object): - + def __init__(self, values=[]): self.n = 0L # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) self.maxValue = float("-inf") self.minValue = float("inf") - + for v in values: self.merge(v) - + # Add a value into this StatCounter, updating the internal statistics. def merge(self, value): delta = value - self.mu @@ -42,7 +43,7 @@ def merge(self, value): self.maxValue = value if self.minValue > value: self.minValue = value - + return self # Merge another StatCounter into this one, adding up the internal statistics. @@ -50,7 +51,7 @@ def mergeStats(self, other): if not isinstance(other, StatCounter): raise Exception("Can only merge Statcounters!") - if other is self: # reference equality holds + if other is self: # reference equality holds self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order else: if self.n == 0: @@ -59,8 +60,8 @@ def mergeStats(self, other): self.n = other.n self.maxValue = other.maxValue self.minValue = other.minValue - - elif other.n != 0: + + elif other.n != 0: delta = other.mu - self.mu if other.n * 10 < self.n: self.mu = self.mu + (delta * other.n) / (self.n + other.n) @@ -68,10 +69,10 @@ def mergeStats(self, other): self.mu = other.mu - (delta * self.n) / (self.n + other.n) else: self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) - + self.maxValue = max(self.maxValue, other.maxValue) self.minValue = min(self.minValue, other.minValue) - + self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n return self @@ -94,7 +95,7 @@ def min(self): def max(self): return self.maxValue - + # Return the variance of the values. def variance(self): if self.n == 0: @@ -124,5 +125,5 @@ def sampleStdev(self): return math.sqrt(self.sampleVariance()) def __repr__(self): - return "(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % (self.count(), self.mean(), self.stdev(), self.max(), self.min()) - + return "(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % + (self.count(), self.mean(), self.stdev(), self.max(), self.min()) From 8f8e4c0ecc9a2b5021a32f867475653b22505bc4 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:17:06 -0400 Subject: [PATCH 21/28] [SPARK-2470] PEP8 fixes to storagelevel.py --- python/pyspark/storagelevel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index 3a18ea54eae4c..5d77a131f2856 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -17,6 +17,7 @@ __all__ = ["StorageLevel"] + class StorageLevel: """ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, @@ -25,7 +26,7 @@ class StorageLevel: Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY. """ - def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication = 1): + def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): self.useDisk = useDisk self.useMemory = useMemory self.useOffHeap = useOffHeap @@ -55,4 +56,4 @@ def __str__(self): StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2) StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2) -StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1) \ No newline at end of file +StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1) From 7d557b7385fcfcd763209ce4345aa76f02c6f845 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 20 Jul 2014 16:17:14 -0400 Subject: [PATCH 22/28] [SPARK-2470] PEP8 fixes to tests.py --- python/pyspark/tests.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c15bb457759ed..9c5ecd0bb02ab 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -52,12 +52,13 @@ class PySparkTestCase(unittest.TestCase): def setUp(self): self._old_sys_path = list(sys.path) class_name = self.__class__.__name__ - self.sc = SparkContext('local[4]', class_name , batchSize=2) + self.sc = SparkContext('local[4]', class_name, batchSize=2) def tearDown(self): self.sc.stop() sys.path = self._old_sys_path + class TestCheckpoint(PySparkTestCase): def setUp(self): @@ -190,6 +191,7 @@ def test_deleting_input_files(self): def testAggregateByKey(self): data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2) + def seqOp(x, y): x.add(y) return x @@ -197,17 +199,19 @@ def seqOp(x, y): def combOp(x, y): x |= y return x - + sets = dict(data.aggregateByKey(set(), seqOp, combOp).collect()) self.assertEqual(3, len(sets)) self.assertEqual(set([1]), sets[1]) self.assertEqual(set([2]), sets[3]) self.assertEqual(set([1, 3]), sets[5]) + class TestIO(PySparkTestCase): def test_stdout_redirection(self): import subprocess + def func(x): subprocess.check_call('ls', shell=True) self.sc.parallelize([1]).foreach(func) @@ -479,7 +483,7 @@ def test_module_dependency(self): | return x + 1 """) proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) From 24639bc2b7651a34882367684a3761f69f8dffa7 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 21 Jul 2014 18:34:07 -0400 Subject: [PATCH 23/28] [SPARK-2470] fix whitespace for doctest --- python/pyspark/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 8f75cd52a1378..b50590ab3b444 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -50,7 +50,7 @@ spark.executorEnv.VAR4=value4 spark.home=/path >>> sorted(conf.getAll(), key=lambda p: p[0]) -[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'),\ +[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \ (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] """ From 22132a4a17d1062b5cfa501b55b813e47b9f6d59 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 21 Jul 2014 18:34:39 -0400 Subject: [PATCH 24/28] [SPARK-2470] wrap conditionals in parentheses --- python/pyspark/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c1c84d001fa96..e21be0e10a3f7 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -191,8 +191,8 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: - if SparkContext._active_spark_context and - SparkContext._active_spark_context != instance: + if (SparkContext._active_spark_context and + SparkContext._active_spark_context != instance): currentMaster = SparkContext._active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite From 9127d2bc56e6a1f71bf3975c7dd4242d6e165856 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 21 Jul 2014 18:35:39 -0400 Subject: [PATCH 25/28] [SPARK-2470] wrap expression lists in parentheses --- python/pyspark/serializers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index f66045c12c367..9be78b39fbc21 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -196,8 +196,8 @@ def _load_stream_without_unbatching(self, stream): return self.serializer.load_stream(stream) def __eq__(self, other): - return isinstance(other, BatchedSerializer) and - other.serializer == self.serializer + return (isinstance(other, BatchedSerializer) and + other.serializer == self.serializer) def __str__(self): return "BatchedSerializer<%s>" % str(self.serializer) @@ -228,8 +228,8 @@ def load_stream(self, stream): yield pair def __eq__(self, other): - return isinstance(other, CartesianDeserializer) and - self.key_ser == other.key_ser and self.val_ser == other.val_ser + return (isinstance(other, CartesianDeserializer) and + self.key_ser == other.key_ser and self.val_ser == other.val_ser) def __str__(self): return "CartesianDeserializer<%s, %s>" % \ @@ -251,8 +251,8 @@ def load_stream(self, stream): yield pair def __eq__(self, other): - return isinstance(other, PairDeserializer) and - self.key_ser == other.key_ser and self.val_ser == other.val_ser + return (isinstance(other, PairDeserializer) and + self.key_ser == other.key_ser and self.val_ser == other.val_ser) def __str__(self): return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser)) From e178dbe5d96ff97d1b294ac1424daa170770b234 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 21 Jul 2014 18:35:54 -0400 Subject: [PATCH 26/28] [SPARK-2470] style - change position of line break --- python/pyspark/shell.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index f222330c48261..e1e7cd954189f 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -35,8 +35,8 @@ from pyspark.storagelevel import StorageLevel # this is the equivalent of ADD_JARS -add_files = ( - os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") is not None else None) +add_files = (os.environ.get("ADD_FILES").split(',') + if os.environ.get("ADD_FILES") is not None else None) if os.environ.get("SPARK_EXECUTOR_URI"): SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) From cba77687655a7b90c984d312294f227193ede51f Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 21 Jul 2014 18:36:13 -0400 Subject: [PATCH 27/28] [SPARK-2470] wrap expression list in parentheses --- python/pyspark/statcounter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 6bdf6db9eca49..e287bd3da1f61 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -125,5 +125,5 @@ def sampleStdev(self): return math.sqrt(self.sampleVariance()) def __repr__(self): - return "(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % - (self.count(), self.mean(), self.stdev(), self.max(), self.min()) + return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % + (self.count(), self.mean(), self.stdev(), self.max(), self.min())) From 98171af6b24d4bb4858a646a18e638b9d7cdd0ac Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 21 Jul 2014 18:39:27 -0400 Subject: [PATCH 28/28] [SPARK-2470] revert PEP 8 fixes to cloudpickle Since cloudpickle is a third-party module, we should mostly leave it alone. Discussion here: https://github.com/apache/spark/pull/1505#discussion-diff-15197904 --- python/pyspark/cloudpickle.py | 305 +++++++++++++++------------------- 1 file changed, 138 insertions(+), 167 deletions(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index caa2baa32a364..4fda2a9b950b8 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -55,7 +55,7 @@ import dis import traceback -# relevant opcodes +#relevant opcodes STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL')) DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL')) LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL')) @@ -70,11 +70,7 @@ try: import ctypes except (MemoryError, ImportError): - logging.warning( - ('Exception raised on importing ctypes. Likely python bug.. ' - 'some functionality will be disabled'), - exc_info=True - ) + logging.warning('Exception raised on importing ctypes. Likely python bug.. some functionality will be disabled', exc_info = True) ctypes = None PyObject_HEAD = None else: @@ -91,11 +87,9 @@ except ImportError: from StringIO import StringIO - # These helper functions were copied from PiCloud's util module. def islambda(func): - return getattr(func, 'func_name') == '' - + return getattr(func,'func_name') == '' def xrange_params(xrangeobj): """Returns a 3 element tuple describing the xrange start, step, and len @@ -108,32 +102,31 @@ def xrange_params(xrangeobj): """ xrange_len = len(xrangeobj) - if not xrange_len: # empty - return (0, 1, 0) + if not xrange_len: #empty + return (0,1,0) start = xrangeobj[0] - if xrange_len == 1: # one element + if xrange_len == 1: #one element return start, 1, 1 return (start, xrangeobj[1] - xrangeobj[0], xrange_len) -# debug variables intended for developer use: +#debug variables intended for developer use: printSerialization = False printMemoization = False -useForcedImports = True # Should I use forced imports for tracking? +useForcedImports = True #Should I use forced imports for tracking? + class CloudPickler(pickle.Pickler): dispatch = pickle.Pickler.dispatch.copy() savedForceImports = False - savedDjangoEnv = False # hack tro transport django environment + savedDjangoEnv = False #hack tro transport django environment - def __init__(self, file, protocol=None, min_size_to_save=0): - pickle.Pickler.__init__(self, file, protocol) - self.modules = set() # set of modules needed to depickle - # map ids to dictionary. used to ensure that - # functions can share global env - self.globals_ref = {} + def __init__(self, file, protocol=None, min_size_to_save= 0): + pickle.Pickler.__init__(self,file,protocol) + self.modules = set() #set of modules needed to depickle + self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env def dump(self, obj): # note: not thread safe @@ -157,57 +150,59 @@ def dump(self, obj): def save_buffer(self, obj): """Fallback to save_string""" - pickle.Pickler.save_string(self, str(obj)) + pickle.Pickler.save_string(self,str(obj)) dispatch[buffer] = save_buffer - # block broken objects + #block broken objects def save_unsupported(self, obj, pack=None): raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj)) dispatch[types.GeneratorType] = save_unsupported - # python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it + #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it try: - slice(0, 1).__reduce__() - except TypeError: # can't pickle - + slice(0,1).__reduce__() + except TypeError: #can't pickle - dispatch[slice] = save_unsupported - # itertools objects do not pickle! + #itertools objects do not pickle! for v in itertools.__dict__.values(): if type(v) is type: dispatch[v] = save_unsupported + def save_dict(self, obj): """hack fix If the dict is a global, deal with it in a special way """ - # print 'saving', obj + #print 'saving', obj if obj is __builtins__: self.save_reduce(_get_module_builtins, (), obj=obj) else: pickle.Pickler.save_dict(self, obj) dispatch[pickle.DictionaryType] = save_dict + def save_module(self, obj, pack=struct.pack): """ Save a module as an import """ - # print 'try save import', obj.__name__ + #print 'try save import', obj.__name__ self.modules.add(obj) - self.save_reduce(subimport, (obj.__name__,), obj=obj) - dispatch[types.ModuleType] = save_module # new type + self.save_reduce(subimport,(obj.__name__,), obj=obj) + dispatch[types.ModuleType] = save_module #new type def save_codeobject(self, obj, pack=struct.pack): """ Save a code object """ - # print 'try to save codeobj: ', obj + #print 'try to save codeobj: ', obj args = ( obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars ) self.save_reduce(types.CodeType, args, obj=obj) - dispatch[types.CodeType] = save_codeobject # new type + dispatch[types.CodeType] = save_codeobject #new type def save_function(self, obj, name=None, pack=struct.pack): """ Registered with the dispatch to handle all function types. @@ -219,12 +214,10 @@ def save_function(self, obj, name=None, pack=struct.pack): name = obj.__name__ modname = pickle.whichmodule(obj, name) - # print 'which gives %s %s %s' % (modname, obj, name) + #print 'which gives %s %s %s' % (modname, obj, name) try: themodule = sys.modules[modname] - except KeyError: - # eval'd items such as namedtuple give invalid items - # for their function __module__ + except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__ modname = '__main__' if modname == '__main__': @@ -234,29 +227,28 @@ def save_function(self, obj, name=None, pack=struct.pack): self.modules.add(themodule) if not self.savedDjangoEnv: - # hack for django - if we detect the settings module, we transport it + #hack for django - if we detect the settings module, we transport it django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '') if django_settings: django_mod = sys.modules.get(django_settings) if django_mod: - cloudLog.debug( - 'Transporting django settings %s during save of %s', - django_mod, name) + cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name) self.savedDjangoEnv = True self.modules.add(django_mod) write(pickle.MARK) self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod) write(pickle.POP_MARK) + # if func is lambda, def'ed at prompt, is in main, or is nested, then # we'll pickle the actual function object rather than simply saving a # reference (as is done in default pickler), via save_function_tuple. if islambda(obj) or obj.func_code.co_filename == '' or themodule is None: - # Force server to import modules that have been imported in main + #Force server to import modules that have been imported in main modList = None if themodule is None and not self.savedForceImports: mainmod = sys.modules['__main__'] - if useForcedImports and hasattr(mainmod, '___pyc_forcedImports__'): + if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'): modList = list(mainmod.___pyc_forcedImports__) self.savedForceImports = True self.save_function_tuple(obj, modList) @@ -298,12 +290,12 @@ def save_function_tuple(self, func, forced_imports): if forced_imports: write(pickle.MARK) save(_modules_to_main) - # print 'forced imports are', forced_imports + #print 'forced imports are', forced_imports forced_names = map(lambda m: m.__name__, forced_imports) save((forced_names,)) - # save((forced_imports,)) + #save((forced_imports,)) write(pickle.REDUCE) write(pickle.POP_MARK) @@ -350,7 +342,7 @@ def extract_code_globals(co): extended_arg = oparg*65536L if op in GLOBAL_OPS: out_names.add(names[oparg]) - # print 'extracted', out_names, ' from ', names + #print 'extracted', out_names, ' from ', names return out_names def extract_func_data(self, func): @@ -364,14 +356,13 @@ def extract_func_data(self, func): func_global_refs = CloudPickler.extract_code_globals(code) if code.co_consts: # see if nested function have any global refs for const in code.co_consts: - if isinstance(const, types.CodeType) and const.co_names: - func_global_refs = func_global_refs.union( - CloudPickler.extract_code_globals(const)) + if type(const) is types.CodeType and const.co_names: + func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const)) # process all variables referenced by global environment f_globals = {} for var in func_global_refs: - # Some names, such as class functions are not global - we don't need them - if var in func.func_globals: + #Some names, such as class functions are not global - we don't need them + if func.func_globals.has_key(var): f_globals[var] = func.func_globals[var] # defaults requires no processing @@ -380,10 +371,9 @@ def extract_func_data(self, func): def get_contents(cell): try: return cell.cell_contents - except ValueError, e: # cell is empty error on not yet assigned - raise pickle.PicklingError( - ('Function to be pickled has free variables that are referenced before ' - 'assignment in enclosing scope')) + except ValueError, e: #cell is empty error on not yet assigned + raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope') + # process closure if func.func_closure: @@ -395,7 +385,7 @@ def get_contents(cell): dct = func.func_dict if printSerialization: - outvars = ['code: ' + str(code)] + outvars = ['code: ' + str(code) ] outvars.append('globals: ' + str(f_globals)) outvars.append('defaults: ' + str(defaults)) outvars.append('closure: ' + str(closure)) @@ -420,7 +410,7 @@ def save_global(self, obj, name=None, pack=struct.pack): try: __import__(modname) themodule = sys.modules[modname] - except (ImportError, KeyError, AttributeError): # should never occur + except (ImportError, KeyError, AttributeError): #should never occur raise pickle.PicklingError( "Can't pickle %r: Module %s cannot be found" % (obj, modname)) @@ -433,48 +423,46 @@ def save_global(self, obj, name=None, pack=struct.pack): sendRef = True typ = type(obj) - # print 'saving', obj, typ + #print 'saving', obj, typ try: - try: # Deal with case when getattribute fails with exceptions + try: #Deal with case when getattribute fails with exceptions klass = getattr(themodule, name) except (AttributeError): - if modname == '__builtin__': # new.* are misrepeported + if modname == '__builtin__': #new.* are misrepeported modname = 'new' __import__(modname) themodule = sys.modules[modname] try: klass = getattr(themodule, name) except AttributeError, a: - # print themodule, name, obj, type(obj) + #print themodule, name, obj, type(obj) raise pickle.PicklingError("Can't pickle builtin %s" % obj) else: raise except (ImportError, KeyError, AttributeError): - if isinstance(obj, types.TypeType) or isinstance(obj, types.ClassType): + if typ == types.TypeType or typ == types.ClassType: sendRef = False - else: # we can't deal with this + else: #we can't deal with this raise else: - if klass is not obj and - (isinstance(obj, types.TypeType) or isinstance(obj, types.ClassType)): + if klass is not obj and (typ == types.TypeType or typ == types.ClassType): sendRef = False if not sendRef: - # note: Third party types might crash this - add better checks! - d = dict(obj.__dict__) # copy dict proxy to a dict - - # don't extract dict that are properties - if not isinstance(d.get('__dict__', None), property): - d.pop('__dict__', None) - d.pop('__weakref__', None) + #note: Third party types might crash this - add better checks! + d = dict(obj.__dict__) #copy dict proxy to a dict + if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties + d.pop('__dict__',None) + d.pop('__weakref__',None) # hack as __new__ is stored differently in the __dict__ new_override = d.get('__new__', None) if new_override: d['__new__'] = obj.__new__ - self.save_reduce(type(obj), (obj.__name__, obj.__bases__, d), obj=obj) - # print 'internal reduce dask %s %s' % (obj, d) + self.save_reduce(type(obj),(obj.__name__,obj.__bases__, + d),obj=obj) + #print 'internal reduce dask %s %s' % (obj, d) return if self.proto >= 2: @@ -484,7 +472,7 @@ def save_global(self, obj, name=None, pack=struct.pack): if code <= 0xff: write(pickle.EXT1 + chr(code)) elif code <= 0xffff: - write("%c%c%c" % (pickle.EXT2, code & 0xff, code >> 8)) + write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8)) else: write(pickle.EXT4 + pack("= 2 and getattr(func, "__name__", "") == "__newobj__": - # Added fix to allow transient + #Added fix to allow transient cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( @@ -606,7 +596,7 @@ def save_reduce(self, func, args, state=None, args = args[1:] save(cls) - # Don't pickle transient entries + #Don't pickle transient entries if hasattr(obj, '__transient__'): transient = obj.__transient__ state = state.copy() @@ -637,44 +627,46 @@ def save_reduce(self, func, args, state=None, self._batch_setitems(dictitems) if state is not None: - # print 'obj %s has state %s' % (obj, state) + #print 'obj %s has state %s' % (obj, state) save(state) write(pickle.BUILD) + def save_xrange(self, obj): """Save an xrange object in python 2.5 Python 2.6 supports this natively """ range_params = xrange_params(obj) - self.save_reduce(_build_xrange, range_params) + self.save_reduce(_build_xrange,range_params) - # python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it + #python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it try: xrange(0).__reduce__() - except TypeError: # can't pickle -- use PiCloud pickler + except TypeError: #can't pickle -- use PiCloud pickler dispatch[xrange] = save_xrange def save_partial(self, obj): """Partial objects do not serialize correctly in python2.x -- this fixes the bugs""" self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords)) - if sys.version_info < (2, 7): # 2.7 supports partial pickling + if sys.version_info < (2,7): #2.7 supports partial pickling dispatch[partial] = save_partial + def save_file(self, obj): """Save a file""" - import StringIO as pystringIO # we can't use cStringIO as it lacks the name attribute + import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute from ..transport.adapter import SerializingAdapter - if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): + if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") if obj.name == '': - return self.save_reduce(getattr, (sys, 'stdout'), obj=obj) + return self.save_reduce(getattr, (sys,'stdout'), obj=obj) if obj.name == '': - return self.save_reduce(getattr, (sys, 'stderr'), obj=obj) + return self.save_reduce(getattr, (sys,'stderr'), obj=obj) if obj.name == '': raise pickle.PicklingError("Cannot pickle standard input") - if hasattr(obj, 'isatty') and obj.isatty(): + if hasattr(obj, 'isatty') and obj.isatty(): raise pickle.PicklingError("Cannot pickle files that map to tty objects") if 'r' not in obj.mode: raise pickle.PicklingError("Cannot pickle files that are not opened for reading") @@ -685,10 +677,10 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be stat" % name) if obj.closed: - # create an empty closed string io + #create an empty closed string io retval = pystringIO.StringIO("") retval.close() - elif not fsize: # empty file + elif not fsize: #empty file retval = pystringIO.StringIO("") try: tmpfile = file(name) @@ -697,13 +689,10 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) tmpfile.close() if tst != '': - raise pickle.PicklingError( - ("Cannot pickle file %s as it does not appear to map to " - "a physical, real file") % name) + raise pickle.PicklingError("Cannot pickle file %s as it does not appear to map to a physical, real file" % name) elif fsize > SerializingAdapter.max_transmit_data: - raise pickle.PicklingError( - "Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % - (name, SerializingAdapter.max_transmit_data)) + raise pickle.PicklingError("Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % + (name,SerializingAdapter.max_transmit_data)) else: try: tmpfile = file(name) @@ -716,7 +705,7 @@ def save_file(self, obj): retval.seek(curloc) retval.name = name - self.save(retval) # save stringIO + self.save(retval) #save stringIO self.memoize(obj) dispatch[file] = save_file @@ -729,7 +718,6 @@ def inject_numpy(self): self.dispatch[numpy.ufunc] = self.__class__.save_ufunc numpy_tst_mods = ['numpy', 'scipy.special'] - def save_ufunc(self, obj): """Hack function for saving numpy ufunc objects""" name = obj.__name__ @@ -739,8 +727,7 @@ def save_ufunc(self, obj): if name in tst_mod.__dict__: self.save_reduce(_getobject, (tst_mod_name, name)) return - raise pickle.PicklingError( - 'cannot save %s. Cannot resolve what module it is defined in' % str(obj)) + raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' % str(obj)) def inject_timeseries(self): """Handle bugs with pickling scikits timeseries""" @@ -754,22 +741,20 @@ def save_timeseries(self, obj): func, reduce_args, state = obj.__reduce__() if func != ts._tsreconstruct: - raise pickle.PicklingError( - 'timeseries using unexpected reconstruction function %s' % str(func)) - state = ( - 1, - obj.shape, - obj.dtype, - obj.flags.fnc, - obj._data.tostring(), - ts.getmaskarray(obj).tostring(), - obj._fill_value, - obj._dates.shape, - obj._dates.__array__().tostring(), - obj._dates.dtype, # added -- preserve type - obj.freq, - obj._optinfo, - ) + raise pickle.PicklingError('timeseries using unexpected reconstruction function %s' % str(func)) + state = (1, + obj.shape, + obj.dtype, + obj.flags.fnc, + obj._data.tostring(), + ts.getmaskarray(obj).tostring(), + obj._fill_value, + obj._dates.shape, + obj._dates.__array__().tostring(), + obj._dates.dtype, #added -- preserve type + obj.freq, + obj._optinfo, + ) return self.save_reduce(_genTimeSeries, (reduce_args, state)) def inject_email(self): @@ -787,12 +772,12 @@ def inject_addons(self): """Python Imaging Library""" def save_image(self, obj): - if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name and - not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): - # if image not loaded yet -- lazy load - self.save_reduce(_lazyloadImage, (obj.fp,), obj=obj) + if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \ + and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): + #if image not loaded yet -- lazy load + self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj) else: - # image is loaded - just transmit it over + #image is loaded - just transmit it over self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj) """ @@ -803,35 +788,34 @@ def memoize(self, obj): """ + # Shorthands for legacy support def dump(obj, file, protocol=2): CloudPickler(file, protocol).dump(obj) - def dumps(obj, protocol=2): file = StringIO() - cp = CloudPickler(file, protocol) + cp = CloudPickler(file,protocol) cp.dump(obj) - # print 'cloud dumped', str(obj), str(cp.modules) + #print 'cloud dumped', str(obj), str(cp.modules) return file.getvalue() -# hack for __import__ not working as desired +#hack for __import__ not working as desired def subimport(name): __import__(name) return sys.modules[name] - -# hack to load django settings: +#hack to load django settings: def django_settings_load(name): modified_env = False if 'DJANGO_SETTINGS_MODULE' not in os.environ: - os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps + os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps modified_env = True try: module = subimport(name) @@ -841,28 +825,24 @@ def django_settings_load(name): if modified_env: del os.environ['DJANGO_SETTINGS_MODULE'] else: - # add project directory to sys,path: - if hasattr(module, '__file__'): + #add project directory to sys,path: + if hasattr(module,'__file__'): dirname = os.path.split(module.__file__)[0] + '/' sys.path.append(dirname) - # restores function attributes def _restore_attr(obj, attr): for key, val in attr.items(): setattr(obj, key, val) return obj - def _get_module_builtins(): return pickle.__builtins__ - def print_exec(stream): ei = sys.exc_info() traceback.print_exception(ei[0], ei[1], ei[2], None, stream) - def _modules_to_main(modList): """Force every module in modList to be placed into main""" if not modList: @@ -873,24 +853,22 @@ def _modules_to_main(modList): if type(modname) is str: try: mod = __import__(modname) - except Exception, i: # catch all... + except Exception, i: #catch all... sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \ A version mismatch is likely. Specific error was:\n' % modname) print_exec(sys.stderr) else: - setattr(main, mod.__name__, mod) + setattr(main,mod.__name__, mod) else: - # REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) - # In old version actual module was sent - setattr(main, modname.__name__, modname) - + #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) + #In old version actual module was sent + setattr(main,modname.__name__, modname) -# object generators: +#object generators: def _build_xrange(start, step, len): """Built xrange explicitly""" return xrange(start, start + step*len, step) - def _genpartial(func, args, kwds): if not args: args = () @@ -914,13 +892,12 @@ def _fill_function(func, globals, defaults, closure, dict): return func - -def _make_skel_func(code, num_closures, base_globals=None): +def _make_skel_func(code, num_closures, base_globals = None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ - # build closure (cells): + #build closure (cells): if not ctypes: raise Exception('ctypes failed to import; cannot build function') @@ -948,21 +925,17 @@ def _make_skel_func(code, num_closures, base_globals=None): (), (), ('newval',), '', 'cell_changer', 1, '', ('c',), () ) - def _change_cell_value(cell, newval): """ Changes the contents of 'cell' object to newval """ return new.function(cell_changer_code, {}, None, (), (cell,))(newval) - """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" - def _getobject(modname, attribute): mod = __import__(modname, fromlist=[attribute]) return mod.__dict__[attribute] - def _generateImage(size, mode, str_rep): """Generate image from string representation""" import Image @@ -970,34 +943,32 @@ def _generateImage(size, mode, str_rep): i.fromstring(str_rep) return i - def _lazyloadImage(fp): import Image - fp.seek(0) # works in almost any case + fp.seek(0) #works in almost any case return Image.open(fp) - """Timeseries""" - - def _genTimeSeries(reduce_args, state): import scikits.timeseries.tseries as ts from numpy import ndarray from numpy.ma import MaskedArray + time_series = ts._tsreconstruct(*reduce_args) - # from setstate modified + #from setstate modified (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state - # print 'regenerating %s' % dtyp + #print 'regenerating %s' % dtyp MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv)) _dates = time_series._dates - # _dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ - ndarray.__setstate__(_dates, (dsh, dtyp, isf, dtm)) + #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ + ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm)) _dates.freq = frq _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None, toobj=None, toord=None, tostr=None)) # Update the _optinfo dictionary time_series._optinfo.update(infodict) return time_series +