From 11ccddab1db957c011d9160abcb5c084f43b43f4 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 12 Aug 2015 00:04:00 +0530 Subject: [PATCH 1/5] [SPARK-9828] [PySpark] Mutable values should not be default arguments --- python/pyspark/ml/param/__init__.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 7845536161e0..a47ad5798845 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -155,7 +155,7 @@ def getOrDefault(self, param): else: return self._defaultParamMap[param] - def extractParamMap(self, extra={}): + def extractParamMap(self, extra=None): """ Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into @@ -165,12 +165,14 @@ def extractParamMap(self, extra={}): :param extra: extra param values :return: merged param map """ + if extra is None: + extra = dict() paramMap = self._defaultParamMap.copy() paramMap.update(self._paramMap) paramMap.update(extra) return paramMap - def copy(self, extra={}): + def copy(self, extra=None): """ Creates a copy of this instance with the same uid and some extra params. The default implementation creates a @@ -181,6 +183,8 @@ def copy(self, extra={}): :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ + if extra is None: + extra = dict() that = copy.copy(self) that._paramMap = self.extractParamMap(extra) return that @@ -233,7 +237,7 @@ def _setDefault(self, **kwargs): self._defaultParamMap[getattr(self, param)] = value return self - def _copyValues(self, to, extra={}): + def _copyValues(self, to, extra=None): """ Copies param values from this instance to another instance for params shared by them. @@ -241,6 +245,8 @@ def _copyValues(self, to, extra={}): :param extra: extra params to be copied :return: the target instance with param values copied """ + if extra is None: + extra = dict() paramMap = self.extractParamMap(extra) for p in self.params: if p in paramMap and to.hasParam(p.name): From 5652a34be81619919e68e849554220af967b6431 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 12 Aug 2015 00:28:13 +0530 Subject: [PATCH 2/5] more mutable arguments --- python/pyspark/ml/evaluation.py | 4 +++- python/pyspark/ml/tuning.py | 8 ++++++-- python/pyspark/rdd.py | 4 +++- python/pyspark/sql/readwriter.py | 8 ++++++-- python/pyspark/statcounter.py | 4 +++- python/pyspark/streaming/kafka.py | 12 +++++++++--- 6 files changed, 30 insertions(+), 10 deletions(-) diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 06e809352225..34227a0512c8 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -45,7 +45,7 @@ def _evaluate(self, dataset): """ raise NotImplementedError() - def evaluate(self, dataset, params={}): + def evaluate(self, dataset, params=None): """ Evaluates the output with optional parameters. @@ -55,6 +55,8 @@ def evaluate(self, dataset, params={}): params :return: metric """ + if params is None: + params = dict() if isinstance(params, dict): if params: return self.copy(params)._evaluate(dataset) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 0bf988fd72f1..dcfee6a3170a 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -227,7 +227,9 @@ def _fit(self, dataset): bestModel = est.fit(dataset, epm[bestIndex]) return CrossValidatorModel(bestModel) - def copy(self, extra={}): + def copy(self, extra=None): + if extra is None: + extra = dict() newCV = Params.copy(self, extra) if self.isSet(self.estimator): newCV.setEstimator(self.getEstimator().copy(extra)) @@ -250,7 +252,7 @@ def __init__(self, bestModel): def _transform(self, dataset): return self.bestModel.transform(dataset) - def copy(self, extra={}): + def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies the underlying bestModel, @@ -259,6 +261,8 @@ def copy(self, extra={}): :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ + if extra is None: + extra = dict() return CrossValidatorModel(self.bestModel.copy(extra)) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fa8e0a0574a6..325cf37c535a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -700,7 +700,7 @@ def groupBy(self, f, numPartitions=None): return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) @ignore_unicode_prefix - def pipe(self, command, env={}, checkCode=False): + def pipe(self, command, env=None, checkCode=False): """ Return an RDD created by piping elements to a forked external process. @@ -709,6 +709,8 @@ def pipe(self, command, env={}, checkCode=False): :param checkCode: whether or not to check the return value of the shell command. """ + if env is None: + env = dict() def func(iterator): pipe = Popen( shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bf6ac084bbbf..78247c8fa737 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -182,7 +182,7 @@ def orc(self, path): @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, - predicates=None, properties={}): + predicates=None, properties=None): """ Construct a :class:`DataFrame` representing the database table accessible via JDBC URL `url` named `table` and connection `properties`. @@ -208,6 +208,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar should be included. :return: a DataFrame """ + if properties is None: + properties = dict() jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) @@ -427,7 +429,7 @@ def orc(self, path, mode=None, partitionBy=None): self._jwrite.orc(path) @since(1.4) - def jdbc(self, url, table, mode=None, properties={}): + def jdbc(self, url, table, mode=None, properties=None): """Saves the content of the :class:`DataFrame` to a external database table via JDBC. .. note:: Don't create too many partitions in parallel on a large cluster;\ @@ -445,6 +447,8 @@ def jdbc(self, url, table, mode=None, properties={}): arbitrary string tag/value. Normally at least a "user" and "password" property should be included. """ + if properties is None: + properties = dict() jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 944fa414b0c0..0fee3b209682 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -30,7 +30,9 @@ class StatCounter(object): - def __init__(self, values=[]): + def __init__(self, values=None): + if values is None: + values = list() self.n = 0 # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 33dd596335b4..dc5b7fd878ae 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -35,7 +35,7 @@ def utf8_decoder(s): class KafkaUtils(object): @staticmethod - def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, + def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ @@ -52,6 +52,8 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A DStream object """ + if kafkaParams is None: + kafkaParams = dict() kafkaParams.update({ "zookeeper.connect": zkQuorum, "group.id": groupId, @@ -77,7 +79,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1]))) @staticmethod - def createDirectStream(ssc, topics, kafkaParams, fromOffsets={}, + def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ .. note:: Experimental @@ -105,6 +107,8 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={}, :param valueDecoder: A function used to decode value (default is utf8_decoder). :return: A DStream object """ + if fromOffsets is None: + fromOffsets = dict() if not isinstance(topics, list): raise TypeError("topics should be list") if not isinstance(kafkaParams, dict): @@ -129,7 +133,7 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={}, return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer) @staticmethod - def createRDD(sc, kafkaParams, offsetRanges, leaders={}, + def createRDD(sc, kafkaParams, offsetRanges, leaders=None, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ .. note:: Experimental @@ -145,6 +149,8 @@ def createRDD(sc, kafkaParams, offsetRanges, leaders={}, :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A RDD object """ + if leaders is None: + leaders = dict() if not isinstance(kafkaParams, dict): raise TypeError("kafkaParams should be dict") if not isinstance(offsetRanges, list): From c0f2308dc236b9eb5287a5c536ea72205a5d9a8b Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 12 Aug 2015 01:31:26 +0530 Subject: [PATCH 3/5] Create map variables during class instantiation --- python/pyspark/ml/param/__init__.py | 14 ++++++++------ python/pyspark/ml/wrapper.py | 10 ++++++---- python/pyspark/rdd.py | 1 + 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index a47ad5798845..eeeac49b2198 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -60,14 +60,16 @@ class Params(Identifiable): __metaclass__ = ABCMeta - #: internal param map for user-supplied values param map - _paramMap = {} + def __init__(self): + super(Params, self).__init__() + #: internal param map for user-supplied values param map + self._paramMap = {} - #: internal param map for default values - _defaultParamMap = {} + #: internal param map for default values + self._defaultParamMap = {} - #: value returned by :py:func:`params` - _params = None + #: value returned by :py:func:`params` + self._params = None @property def params(self): diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 253705bde913..de510be26492 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -45,10 +45,12 @@ class JavaWrapper(Params): __metaclass__ = ABCMeta - #: The wrapped Java companion object. Subclasses should initialize - #: it properly. The param values in the Java object should be - #: synced with the Python wrapper in fit/transform/evaluate/copy. - _java_obj = None + def __init__(self): + super(JavaWrapper, self).__init__() + #: The wrapped Java companion object. Subclasses should initialize + #: it properly. The param values in the Java object should be + #: synced with the Python wrapper in fit/transform/evaluate/copy. + self._java_obj = None @staticmethod def _new_java_obj(java_class, *args): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 325cf37c535a..9ef60a7e2c84 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -711,6 +711,7 @@ def pipe(self, command, env=None, checkCode=False): """ if env is None: env = dict() + def func(iterator): pipe = Popen( shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) From 4b2c0e86900770d4c52869bbd7029f3968ed32a6 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 13 Aug 2015 15:21:17 +0530 Subject: [PATCH 4/5] remove constructor --- python/pyspark/ml/wrapper.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index de510be26492..253705bde913 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -45,12 +45,10 @@ class JavaWrapper(Params): __metaclass__ = ABCMeta - def __init__(self): - super(JavaWrapper, self).__init__() - #: The wrapped Java companion object. Subclasses should initialize - #: it properly. The param values in the Java object should be - #: synced with the Python wrapper in fit/transform/evaluate/copy. - self._java_obj = None + #: The wrapped Java companion object. Subclasses should initialize + #: it properly. The param values in the Java object should be + #: synced with the Python wrapper in fit/transform/evaluate/copy. + _java_obj = None @staticmethod def _new_java_obj(java_class, *args): From 20f2b50688656d20bd6e2ff63e9fe730d4d5807c Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 14 Aug 2015 12:40:55 +0530 Subject: [PATCH 5/5] last --- python/pyspark/ml/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 9889f56cac9e..13cf2b0f7bbd 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -141,7 +141,7 @@ class Pipeline(Estimator): @keyword_only def __init__(self, stages=None): """ - __init__(self, stages=[]) + __init__(self, stages=None) """ if stages is None: stages = [] @@ -170,7 +170,7 @@ def getStages(self): @keyword_only def setParams(self, stages=None): """ - setParams(self, stages=[]) + setParams(self, stages=None) Sets params for Pipeline. """ if stages is None: