diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 4fda2a9b950b8..07514d7985661 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -55,7 +55,7 @@ import dis import traceback -#relevant opcodes +# relevant opcodes STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL')) DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL')) LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL')) @@ -70,7 +70,9 @@ try: import ctypes except (MemoryError, ImportError): - logging.warning('Exception raised on importing ctypes. Likely python bug.. some functionality will be disabled', exc_info = True) + logging.warning('Exception raised on importing ctypes. \ + Likely python bug.. some functionality will be disabled', + exc_info=True) ctypes = None PyObject_HEAD = None else: @@ -87,9 +89,11 @@ except ImportError: from StringIO import StringIO + # These helper functions were copied from PiCloud's util module. def islambda(func): - return getattr(func,'func_name') == '' + return getattr(func, 'func_name') == '' + def xrange_params(xrangeobj): """Returns a 3 element tuple describing the xrange start, step, and len @@ -102,31 +106,31 @@ def xrange_params(xrangeobj): """ xrange_len = len(xrangeobj) - if not xrange_len: #empty - return (0,1,0) + if not xrange_len: # empty + return (0, 1, 0) start = xrangeobj[0] - if xrange_len == 1: #one element + if xrange_len == 1: # one element return start, 1, 1 return (start, xrangeobj[1] - xrangeobj[0], xrange_len) -#debug variables intended for developer use: +# debug variables intended for developer use: printSerialization = False printMemoization = False -useForcedImports = True #Should I use forced imports for tracking? - +useForcedImports = True # Should I use forced imports for tracking? class CloudPickler(pickle.Pickler): dispatch = pickle.Pickler.dispatch.copy() savedForceImports = False - savedDjangoEnv = False #hack tro transport django environment + savedDjangoEnv = False # hack tro transport django environment - def __init__(self, file, protocol=None, min_size_to_save= 0): - pickle.Pickler.__init__(self,file,protocol) - self.modules = set() #set of modules needed to depickle - self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env + def __init__(self, file, protocol=None, min_size_to_save=0): + pickle.Pickler.__init__(self, file, protocol) + self.modules = set() # set of modules needed to depickle + self.globals_ref = {} # map ids to dictionary. + # used to ensure that functions can share global env def dump(self, obj): # note: not thread safe @@ -150,62 +154,63 @@ def dump(self, obj): def save_buffer(self, obj): """Fallback to save_string""" - pickle.Pickler.save_string(self,str(obj)) + pickle.Pickler.save_string(self, str(obj)) dispatch[buffer] = save_buffer - #block broken objects + # block broken objects def save_unsupported(self, obj, pack=None): raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj)) dispatch[types.GeneratorType] = save_unsupported - #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it + # python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it try: - slice(0,1).__reduce__() - except TypeError: #can't pickle - + slice(0, 1).__reduce__() + except TypeError: # can't pickle dispatch[slice] = save_unsupported - #itertools objects do not pickle! + # itertools objects do not pickle! for v in itertools.__dict__.values(): if type(v) is type: dispatch[v] = save_unsupported - def save_dict(self, obj): - """hack fix + """ + hack fix If the dict is a global, deal with it in a special way """ - #print 'saving', obj + # print 'saving', obj if obj is __builtins__: self.save_reduce(_get_module_builtins, (), obj=obj) else: pickle.Pickler.save_dict(self, obj) dispatch[pickle.DictionaryType] = save_dict - def save_module(self, obj, pack=struct.pack): """ Save a module as an import """ - #print 'try save import', obj.__name__ + # print 'try save import', obj.__name__ self.modules.add(obj) - self.save_reduce(subimport,(obj.__name__,), obj=obj) - dispatch[types.ModuleType] = save_module #new type + self.save_reduce(subimport, obj.__name__,), obj=obj) + dispatch[types.ModuleType] = save_module # new type + def save_codeobject(self, obj, pack=struct.pack): """ Save a code object """ - #print 'try to save codeobj: ', obj + # print 'try to save codeobj: ', obj args = ( obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars ) self.save_reduce(types.CodeType, args, obj=obj) - dispatch[types.CodeType] = save_codeobject #new type + dispatch[types.CodeType] = save_codeobject # new type def save_function(self, obj, name=None, pack=struct.pack): - """ Registered with the dispatch to handle all function types. + """ + Registered with the dispatch to handle all function types. Determines what kind of function obj is (e.g. lambda, defined at interactive prompt, etc) and handles the pickling appropriately. @@ -214,10 +219,11 @@ def save_function(self, obj, name=None, pack=struct.pack): name = obj.__name__ modname = pickle.whichmodule(obj, name) - #print 'which gives %s %s %s' % (modname, obj, name) + # print 'which gives %s %s %s' % (modname, obj, name) try: themodule = sys.modules[modname] - except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__ + except KeyError: # eval'd items such as namedtuple give invalid + # items for their function __module__ modname = '__main__' if modname == '__main__': @@ -227,7 +233,7 @@ def save_function(self, obj, name=None, pack=struct.pack): self.modules.add(themodule) if not self.savedDjangoEnv: - #hack for django - if we detect the settings module, we transport it + # hack for django - if we detect the settings module, we transport it django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '') if django_settings: django_mod = sys.modules.get(django_settings) @@ -244,16 +250,16 @@ def save_function(self, obj, name=None, pack=struct.pack): # we'll pickle the actual function object rather than simply saving a # reference (as is done in default pickler), via save_function_tuple. if islambda(obj) or obj.func_code.co_filename == '' or themodule is None: - #Force server to import modules that have been imported in main + # Force server to import modules that have been imported in main modList = None if themodule is None and not self.savedForceImports: mainmod = sys.modules['__main__'] - if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'): + if useForcedImports and hasattr(mainmod, '___pyc_forcedImports__'): modList = list(mainmod.___pyc_forcedImports__) self.savedForceImports = True self.save_function_tuple(obj, modList) return - else: # func is nested + else: # func is nested klass = getattr(themodule, name, None) if klass is None or klass is not obj: self.save_function_tuple(obj, [themodule]) @@ -272,7 +278,8 @@ def save_function(self, obj, name=None, pack=struct.pack): dispatch[types.FunctionType] = save_function def save_function_tuple(self, func, forced_imports): - """ Pickles an actual func object. + """ + Pickles an actual func object. A func comprises: code, globals, defaults, closure, and dict. We extract and save these, injecting reducing functions at certain points @@ -290,12 +297,12 @@ def save_function_tuple(self, func, forced_imports): if forced_imports: write(pickle.MARK) save(_modules_to_main) - #print 'forced imports are', forced_imports + # print 'forced imports are', forced_imports forced_names = map(lambda m: m.__name__, forced_imports) save((forced_names,)) - #save((forced_imports,)) + # save((forced_imports,)) write(pickle.REDUCE) write(pickle.POP_MARK) @@ -342,7 +349,7 @@ def extract_code_globals(co): extended_arg = oparg*65536L if op in GLOBAL_OPS: out_names.add(names[oparg]) - #print 'extracted', out_names, ' from ', names + # print 'extracted', out_names, ' from ', names return out_names def extract_func_data(self, func): @@ -357,11 +364,11 @@ def extract_func_data(self, func): if code.co_consts: # see if nested function have any global refs for const in code.co_consts: if type(const) is types.CodeType and const.co_names: - func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const)) + func_global_refs = func_global_refs.union(CloudPickler.extract_code_globals(const)) # process all variables referenced by global environment f_globals = {} for var in func_global_refs: - #Some names, such as class functions are not global - we don't need them + # Some names, such as class functions are not global - we don't need them if func.func_globals.has_key(var): f_globals[var] = func.func_globals[var] @@ -371,7 +378,7 @@ def extract_func_data(self, func): def get_contents(cell): try: return cell.cell_contents - except ValueError, e: #cell is empty error on not yet assigned + except ValueError, e: # cell is empty error on not yet assigned raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope') @@ -410,7 +417,7 @@ def save_global(self, obj, name=None, pack=struct.pack): try: __import__(modname) themodule = sys.modules[modname] - except (ImportError, KeyError, AttributeError): #should never occur + except (ImportError, KeyError, AttributeError): # should never occur raise pickle.PicklingError( "Can't pickle %r: Module %s cannot be found" % (obj, modname)) @@ -423,19 +430,19 @@ def save_global(self, obj, name=None, pack=struct.pack): sendRef = True typ = type(obj) - #print 'saving', obj, typ + # print 'saving', obj, typ try: - try: #Deal with case when getattribute fails with exceptions + try: # Deal with case when getattribute fails with exceptions klass = getattr(themodule, name) except (AttributeError): - if modname == '__builtin__': #new.* are misrepeported + if modname == '__builtin__': # new.* are misrepeported modname = 'new' __import__(modname) themodule = sys.modules[modname] try: klass = getattr(themodule, name) except AttributeError, a: - #print themodule, name, obj, type(obj) + # print themodule, name, obj, type(obj) raise pickle.PicklingError("Can't pickle builtin %s" % obj) else: raise @@ -443,26 +450,26 @@ def save_global(self, obj, name=None, pack=struct.pack): except (ImportError, KeyError, AttributeError): if typ == types.TypeType or typ == types.ClassType: sendRef = False - else: #we can't deal with this + else: # we can't deal with this raise else: if klass is not obj and (typ == types.TypeType or typ == types.ClassType): sendRef = False if not sendRef: - #note: Third party types might crash this - add better checks! - d = dict(obj.__dict__) #copy dict proxy to a dict - if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties - d.pop('__dict__',None) - d.pop('__weakref__',None) + # note: Third party types might crash this - add better checks! + d = dict(obj.__dict__) # copy dict proxy to a dict + if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties + d.pop('__dict__', None) + d.pop('__weakref__', None) # hack as __new__ is stored differently in the __dict__ new_override = d.get('__new__', None) if new_override: d['__new__'] = obj.__new__ - self.save_reduce(type(obj),(obj.__name__,obj.__bases__, - d),obj=obj) - #print 'internal reduce dask %s %s' % (obj, d) + self.save_reduce(type(obj), (obj.__name__, obj.__bases__, + d), obj=obj) + # print 'internal reduce dask %s %s' % (obj, d) return if self.proto >= 2: @@ -484,13 +491,15 @@ def save_global(self, obj, name=None, pack=struct.pack): dispatch[types.TypeType] = save_global def save_instancemethod(self, obj): - #Memoization rarely is ever useful due to python bounding - self.save_reduce(types.MethodType, (obj.im_func, obj.im_self,obj.im_class), obj=obj) + # Memoization rarely is ever useful due to python bounding + self.save_reduce(types.MethodType, (obj.im_func, obj.im_self, obj.im_class), obj=obj) dispatch[types.MethodType] = save_instancemethod def save_inst_logic(self, obj): - """Inner logic to save instance. Based off pickle.save_inst - Supports __transient__""" + """ + Inner logic to save instance. Based off pickle.save_inst + Supports __transient__ + """ cls = obj.__class__ memo = self.memo @@ -499,7 +508,7 @@ def save_inst_logic(self, obj): if hasattr(obj, '__getinitargs__'): args = obj.__getinitargs__() - len(args) # XXX Assert it's a sequence + len(args) # XXX Assert it's a sequence pickle._keep_alive(args, memo) else: args = () @@ -522,7 +531,7 @@ def save_inst_logic(self, obj): getstate = obj.__getstate__ except AttributeError: stuff = obj.__dict__ - #remove items if transient + # remove items if transient if hasattr(obj, '__transient__'): transient = obj.__transient__ stuff = stuff.copy() @@ -539,7 +548,7 @@ def save_inst_logic(self, obj): def save_inst(self, obj): # Hack to detect PIL Image instances without importing Imaging # PIL can be loaded with multiple names, so we don't check sys.modules for it - if hasattr(obj,'im') and hasattr(obj,'palette') and 'Image' in obj.__module__: + if hasattr(obj, 'im') and hasattr(obj, 'palette') and 'Image' in obj.__module__: self.save_image(obj) else: self.save_inst_logic(obj) @@ -570,8 +579,10 @@ class ItemGetterType(ctypes.Structure): def save_reduce(self, func, args, state=None, listitems=None, dictitems=None, obj=None): - """Modified to support __transient__ on new objects - Change only affects protocol level 2 (which is always used by PiCloud""" + """ + Modified to support __transient__ on new objects + Change only affects protocol level 2 (which is always used by PiCloud + """ # Assert that args is a tuple or None if not isinstance(args, types.TupleType): raise pickle.PicklingError("args from reduce() should be a tuple") @@ -585,7 +596,7 @@ def save_reduce(self, func, args, state=None, # Protocol 2 special case: if func's name is __newobj__, use NEWOBJ if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__": - #Added fix to allow transient + # Added fix to allow transient cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( @@ -596,7 +607,7 @@ def save_reduce(self, func, args, state=None, args = args[1:] save(cls) - #Don't pickle transient entries + # Don't pickle transient entries if hasattr(obj, '__transient__'): transient = obj.__transient__ state = state.copy() @@ -627,35 +638,36 @@ def save_reduce(self, func, args, state=None, self._batch_setitems(dictitems) if state is not None: - #print 'obj %s has state %s' % (obj, state) + # print 'obj %s has state %s' % (obj, state) save(state) write(pickle.BUILD) def save_xrange(self, obj): - """Save an xrange object in python 2.5 + """ + Save an xrange object in python 2.5 Python 2.6 supports this natively """ range_params = xrange_params(obj) self.save_reduce(_build_xrange,range_params) - #python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it + # python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it try: xrange(0).__reduce__() - except TypeError: #can't pickle -- use PiCloud pickler + except TypeError: # can't pickle -- use PiCloud pickler dispatch[xrange] = save_xrange def save_partial(self, obj): """Partial objects do not serialize correctly in python2.x -- this fixes the bugs""" self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords)) - if sys.version_info < (2,7): #2.7 supports partial pickling + if sys.version_info < (2,7): # 2.7 supports partial pickling dispatch[partial] = save_partial def save_file(self, obj): """Save a file""" - import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute + import StringIO as pystringIO # we can't use cStringIO as it lacks the name attribute from ..transport.adapter import SerializingAdapter if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): @@ -677,10 +689,10 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be stat" % name) if obj.closed: - #create an empty closed string io + # create an empty closed string io retval = pystringIO.StringIO("") retval.close() - elif not fsize: #empty file + elif not fsize: # empty file retval = pystringIO.StringIO("") try: tmpfile = file(name) @@ -705,7 +717,7 @@ def save_file(self, obj): retval.seek(curloc) retval.name = name - self.save(retval) #save stringIO + self.save(retval) # save stringIO self.memoize(obj) dispatch[file] = save_file @@ -751,7 +763,7 @@ def save_timeseries(self, obj): obj._fill_value, obj._dates.shape, obj._dates.__array__().tostring(), - obj._dates.dtype, #added -- preserve type + obj._dates.dtype, # added -- preserve type obj.freq, obj._optinfo, ) @@ -774,10 +786,10 @@ def inject_addons(self): def save_image(self, obj): if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \ and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): - #if image not loaded yet -- lazy load + # if image not loaded yet -- lazy load self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj) else: - #image is loaded - just transmit it over + # image is loaded - just transmit it over self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj) """ @@ -800,22 +812,22 @@ def dumps(obj, protocol=2): cp = CloudPickler(file,protocol) cp.dump(obj) - #print 'cloud dumped', str(obj), str(cp.modules) + # print 'cloud dumped', str(obj), str(cp.modules) return file.getvalue() -#hack for __import__ not working as desired +# hack for __import__ not working as desired def subimport(name): __import__(name) return sys.modules[name] -#hack to load django settings: +# hack to load django settings: def django_settings_load(name): modified_env = False if 'DJANGO_SETTINGS_MODULE' not in os.environ: - os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps + os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps modified_env = True try: module = subimport(name) @@ -825,7 +837,7 @@ def django_settings_load(name): if modified_env: del os.environ['DJANGO_SETTINGS_MODULE'] else: - #add project directory to sys,path: + # add project directory to sys,path: if hasattr(module,'__file__'): dirname = os.path.split(module.__file__)[0] + '/' sys.path.append(dirname) @@ -853,18 +865,18 @@ def _modules_to_main(modList): if type(modname) is str: try: mod = __import__(modname) - except Exception, i: #catch all... + except Exception, i: # catch all... sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \ A version mismatch is likely. Specific error was:\n' % modname) print_exec(sys.stderr) else: setattr(main,mod.__name__, mod) else: - #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) - #In old version actual module was sent + # REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) + # In old version actual module was sent setattr(main,modname.__name__, modname) -#object generators: +# object generators: def _build_xrange(start, step, len): """Built xrange explicitly""" return xrange(start, start + step*len, step) @@ -897,7 +909,7 @@ def _make_skel_func(code, num_closures, base_globals = None): code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ - #build closure (cells): + # build closure (cells): if not ctypes: raise Exception('ctypes failed to import; cannot build function') @@ -945,7 +957,7 @@ def _generateImage(size, mode, str_rep): def _lazyloadImage(fp): import Image - fp.seek(0) #works in almost any case + fp.seek(0) # works in almost any case return Image.open(fp) """Timeseries""" @@ -957,18 +969,17 @@ def _genTimeSeries(reduce_args, state): time_series = ts._tsreconstruct(*reduce_args) - #from setstate modified + # from setstate modified (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state - #print 'regenerating %s' % dtyp + # print 'regenerating %s' % dtyp MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv)) _dates = time_series._dates - #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ + # _dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm)) _dates.freq = frq _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None, toobj=None, toord=None, tostr=None)) # Update the _optinfo dictionary time_series._optinfo.update(infodict) - return time_series - + return time_series \ No newline at end of file