@@ -35,7 +35,7 @@ def utf8_decoder(s):
3535class KafkaUtils (object ):
3636
3737 @staticmethod
38- def createStream (ssc , zkQuorum , groupId , topics , kafkaParams = {} ,
38+ def createStream (ssc , zkQuorum , groupId , topics , kafkaParams = None ,
3939 storageLevel = StorageLevel .MEMORY_AND_DISK_SER_2 ,
4040 keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
4141 """
@@ -52,6 +52,8 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
5252 :param valueDecoder: A function used to decode value (default is utf8_decoder)
5353 :return: A DStream object
5454 """
55+ if kafkaParams is None :
56+ kafkaParams = dict ()
5557 kafkaParams .update ({
5658 "zookeeper.connect" : zkQuorum ,
5759 "group.id" : groupId ,
@@ -77,7 +79,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
7779 return stream .map (lambda k_v : (keyDecoder (k_v [0 ]), valueDecoder (k_v [1 ])))
7880
7981 @staticmethod
80- def createDirectStream (ssc , topics , kafkaParams , fromOffsets = {} ,
82+ def createDirectStream (ssc , topics , kafkaParams , fromOffsets = None ,
8183 keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
8284 """
8385 .. note:: Experimental
@@ -105,6 +107,8 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
105107 :param valueDecoder: A function used to decode value (default is utf8_decoder).
106108 :return: A DStream object
107109 """
110+ if fromOffsets is None :
111+ fromOffsets = dict ()
108112 if not isinstance (topics , list ):
109113 raise TypeError ("topics should be list" )
110114 if not isinstance (kafkaParams , dict ):
@@ -129,7 +133,7 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
129133 return KafkaDStream (stream ._jdstream , ssc , stream ._jrdd_deserializer )
130134
131135 @staticmethod
132- def createRDD (sc , kafkaParams , offsetRanges , leaders = {} ,
136+ def createRDD (sc , kafkaParams , offsetRanges , leaders = None ,
133137 keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
134138 """
135139 .. note:: Experimental
@@ -145,6 +149,8 @@ def createRDD(sc, kafkaParams, offsetRanges, leaders={},
145149 :param valueDecoder: A function used to decode value (default is utf8_decoder)
146150 :return: A RDD object
147151 """
152+ if leaders is None :
153+ leaders = dict ()
148154 if not isinstance (kafkaParams , dict ):
149155 raise TypeError ("kafkaParams should be dict" )
150156 if not isinstance (offsetRanges , list ):
0 commit comments