@@ -33,7 +33,7 @@ def utf8_decoder(s):
3333class KafkaUtils (object ):
3434
3535 @staticmethod
36- def createStream (ssc , zkQuorum , groupId , topics , kafkaParams = {} ,
36+ def createStream (ssc , zkQuorum , groupId , topics , kafkaParams = None ,
3737 storageLevel = StorageLevel .MEMORY_AND_DISK_SER_2 ,
3838 keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
3939 """
@@ -50,6 +50,8 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
5050 :param valueDecoder: A function used to decode value (default is utf8_decoder)
5151 :return: A DStream object
5252 """
53+ if kafkaParams is None :
54+ kafkaParams = dict ()
5355 kafkaParams .update ({
5456 "zookeeper.connect" : zkQuorum ,
5557 "group.id" : groupId ,
@@ -75,7 +77,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
7577 return stream .map (lambda k_v : (keyDecoder (k_v [0 ]), valueDecoder (k_v [1 ])))
7678
7779 @staticmethod
78- def createDirectStream (ssc , topics , kafkaParams , fromOffsets = {} ,
80+ def createDirectStream (ssc , topics , kafkaParams , fromOffsets = None ,
7981 keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
8082 """
8183 .. note:: Experimental
@@ -103,6 +105,8 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
103105 :param valueDecoder: A function used to decode value (default is utf8_decoder).
104106 :return: A DStream object
105107 """
108+ if fromOffsets is None :
109+ fromOffsets = dict ()
106110 if not isinstance (topics , list ):
107111 raise TypeError ("topics should be list" )
108112 if not isinstance (kafkaParams , dict ):
@@ -126,7 +130,7 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
126130 return stream .map (lambda k_v : (keyDecoder (k_v [0 ]), valueDecoder (k_v [1 ])))
127131
128132 @staticmethod
129- def createRDD (sc , kafkaParams , offsetRanges , leaders = {} ,
133+ def createRDD (sc , kafkaParams , offsetRanges , leaders = None ,
130134 keyDecoder = utf8_decoder , valueDecoder = utf8_decoder ):
131135 """
132136 .. note:: Experimental
@@ -142,6 +146,8 @@ def createRDD(sc, kafkaParams, offsetRanges, leaders={},
142146 :param valueDecoder: A function used to decode value (default is utf8_decoder)
143147 :return: A RDD object
144148 """
149+ if leaders is None :
150+ leaders = dict ()
145151 if not isinstance (kafkaParams , dict ):
146152 raise TypeError ("kafkaParams should be dict" )
147153 if not isinstance (offsetRanges , list ):
0 commit comments