Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/pyspark/ml/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _evaluate(self, dataset):
"""
raise NotImplementedError()

def evaluate(self, dataset, params={}):
def evaluate(self, dataset, params=None):
"""
Evaluates the output with optional parameters.

Expand All @@ -55,6 +55,8 @@ def evaluate(self, dataset, params={}):
params
:return: metric
"""
if params is None:
params = dict()
if isinstance(params, dict):
if params:
return self.copy(params)._evaluate(dataset)
Expand Down
26 changes: 17 additions & 9 deletions python/pyspark/ml/param/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ class Params(Identifiable):

__metaclass__ = ABCMeta

#: internal param map for user-supplied values param map
_paramMap = {}
def __init__(self):
super(Params, self).__init__()
#: internal param map for user-supplied values param map
self._paramMap = {}

#: internal param map for default values
_defaultParamMap = {}
#: internal param map for default values
self._defaultParamMap = {}

#: value returned by :py:func:`params`
_params = None
#: value returned by :py:func:`params`
self._params = None

@property
def params(self):
Expand Down Expand Up @@ -155,7 +157,7 @@ def getOrDefault(self, param):
else:
return self._defaultParamMap[param]

def extractParamMap(self, extra={}):
def extractParamMap(self, extra=None):
"""
Extracts the embedded default param values and user-supplied
values, and then merges them with extra values from input into
Expand All @@ -165,12 +167,14 @@ def extractParamMap(self, extra={}):
:param extra: extra param values
:return: merged param map
"""
if extra is None:
extra = dict()
paramMap = self._defaultParamMap.copy()
paramMap.update(self._paramMap)
paramMap.update(extra)
return paramMap

def copy(self, extra={}):
def copy(self, extra=None):
"""
Creates a copy of this instance with the same uid and some
extra params. The default implementation creates a
Expand All @@ -181,6 +185,8 @@ def copy(self, extra={}):
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
that = copy.copy(self)
that._paramMap = self.extractParamMap(extra)
return that
Expand Down Expand Up @@ -233,14 +239,16 @@ def _setDefault(self, **kwargs):
self._defaultParamMap[getattr(self, param)] = value
return self

def _copyValues(self, to, extra={}):
def _copyValues(self, to, extra=None):
"""
Copies param values from this instance to another instance for
params shared by them.
:param to: the target instance
:param extra: extra params to be copied
:return: the target instance with param values copied
"""
if extra is None:
extra = dict()
paramMap = self.extractParamMap(extra)
for p in self.params:
if p in paramMap and to.hasParam(p.name):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class Pipeline(Estimator):
@keyword_only
def __init__(self, stages=None):
"""
__init__(self, stages=[])
__init__(self, stages=None)
"""
if stages is None:
stages = []
Expand Down Expand Up @@ -170,7 +170,7 @@ def getStages(self):
@keyword_only
def setParams(self, stages=None):
"""
setParams(self, stages=[])
setParams(self, stages=None)
Sets params for Pipeline.
"""
if stages is None:
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ def _fit(self, dataset):
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)

def copy(self, extra={}):
def copy(self, extra=None):
if extra is None:
extra = dict()
newCV = Params.copy(self, extra)
if self.isSet(self.estimator):
newCV.setEstimator(self.getEstimator().copy(extra))
Expand All @@ -250,7 +252,7 @@ def __init__(self, bestModel):
def _transform(self, dataset):
return self.bestModel.transform(dataset)

def copy(self, extra={}):
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
Expand All @@ -259,6 +261,8 @@ def copy(self, extra={}):
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
return CrossValidatorModel(self.bestModel.copy(extra))


Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ def groupBy(self, f, numPartitions=None):
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)

@ignore_unicode_prefix
def pipe(self, command, env={}, checkCode=False):
def pipe(self, command, env=None, checkCode=False):
"""
Return an RDD created by piping elements to a forked external process.

Expand All @@ -709,6 +709,9 @@ def pipe(self, command, env={}, checkCode=False):

:param checkCode: whether or not to check the return value of the shell command.
"""
if env is None:
env = dict()

def func(iterator):
pipe = Popen(
shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def orc(self, path):

@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
predicates=None, properties={}):
predicates=None, properties=None):
"""
Construct a :class:`DataFrame` representing the database table accessible
via JDBC URL `url` named `table` and connection `properties`.
Expand All @@ -208,6 +208,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar
should be included.
:return: a DataFrame
"""
if properties is None:
properties = dict()
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
Expand Down Expand Up @@ -427,7 +429,7 @@ def orc(self, path, mode=None, partitionBy=None):
self._jwrite.orc(path)

@since(1.4)
def jdbc(self, url, table, mode=None, properties={}):
def jdbc(self, url, table, mode=None, properties=None):
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.

.. note:: Don't create too many partitions in parallel on a large cluster;\
Expand All @@ -445,6 +447,8 @@ def jdbc(self, url, table, mode=None, properties={}):
arbitrary string tag/value. Normally at least a
"user" and "password" property should be included.
"""
if properties is None:
properties = dict()
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/statcounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

class StatCounter(object):

def __init__(self, values=[]):
def __init__(self, values=None):
if values is None:
values = list()
self.n = 0 # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
Expand Down
12 changes: 9 additions & 3 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def utf8_decoder(s):
class KafkaUtils(object):

@staticmethod
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Expand All @@ -52,6 +52,8 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
if kafkaParams is None:
kafkaParams = dict()
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
Expand All @@ -77,7 +79,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))

@staticmethod
def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
.. note:: Experimental
Expand Down Expand Up @@ -105,6 +107,8 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
:param valueDecoder: A function used to decode value (default is utf8_decoder).
:return: A DStream object
"""
if fromOffsets is None:
fromOffsets = dict()
if not isinstance(topics, list):
raise TypeError("topics should be list")
if not isinstance(kafkaParams, dict):
Expand All @@ -129,7 +133,7 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer)

@staticmethod
def createRDD(sc, kafkaParams, offsetRanges, leaders={},
def createRDD(sc, kafkaParams, offsetRanges, leaders=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
.. note:: Experimental
Expand All @@ -145,6 +149,8 @@ def createRDD(sc, kafkaParams, offsetRanges, leaders={},
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A RDD object
"""
if leaders is None:
leaders = dict()
if not isinstance(kafkaParams, dict):
raise TypeError("kafkaParams should be dict")
if not isinstance(offsetRanges, list):
Expand Down