88from itertools import cycle
99from multiprocessing import Queue , Process
1010
11- from kafka .common import ProduceRequest
11+ from kafka .common import ProduceRequest , TopicAndPartition
1212from kafka .partitioner import HashedPartitioner
1313from kafka .protocol import create_message
1414
2020STOP_ASYNC_PRODUCER = - 1
2121
2222
23- def _send_upstream (topic , queue , client , batch_time , batch_size ,
23+ def _send_upstream (queue , client , batch_time , batch_size ,
2424 req_acks , ack_timeout ):
2525 """
2626 Listen on the queue for a specified number of messages or till
@@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
4444 # timeout is reached
4545 while count > 0 and timeout >= 0 :
4646 try :
47- partition , msg = queue .get (timeout = timeout )
47+ topic_partition , msg = queue .get (timeout = timeout )
48+
4849 except Empty :
4950 break
5051
5152 # Check if the controller has requested us to stop
52- if partition == STOP_ASYNC_PRODUCER :
53+ if topic_partition == STOP_ASYNC_PRODUCER :
5354 stop = True
5455 break
5556
5657 # Adjust the timeout to match the remaining period
5758 count -= 1
5859 timeout = send_at - time .time ()
59- msgset [partition ].append (msg )
60+ msgset [topic_partition ].append (msg )
6061
6162 # Send collected requests upstream
6263 reqs = []
63- for partition , messages in msgset .items ():
64- req = ProduceRequest (topic , partition , messages )
64+ for topic_partition , messages in msgset .items ():
65+ req = ProduceRequest (topic_partition .topic ,
66+ topic_partition .partition ,
67+ messages )
6568 reqs .append (req )
6669
6770 try :
@@ -78,7 +81,6 @@ class Producer(object):
7881
7982 Params:
8083 client - The Kafka client instance to use
81- topic - The topic for sending messages to
8284 async - If set to true, the messages are sent asynchronously via another
8385 thread (process). We will not wait for a response to these
8486 req_acks - A value indicating the acknowledgements that the server must
@@ -119,8 +121,7 @@ def __init__(self, client, async=False,
119121 if self .async :
120122 self .queue = Queue () # Messages are sent through this queue
121123 self .proc = Process (target = _send_upstream ,
122- args = (self .topic ,
123- self .queue ,
124+ args = (self .queue ,
124125 self .client .copy (),
125126 batch_send_every_t ,
126127 batch_send_every_n ,
@@ -131,17 +132,18 @@ def __init__(self, client, async=False,
131132 self .proc .daemon = True
132133 self .proc .start ()
133134
134- def send_messages (self , partition , * msg ):
135+ def send_messages (self , topic , partition , * msg ):
135136 """
136137 Helper method to send produce requests
137138 """
138139 if self .async :
139140 for m in msg :
140- self .queue .put ((partition , create_message (m )))
141+ self .queue .put ((TopicAndPartition (topic , partition ),
142+ create_message (m )))
141143 resp = []
142144 else :
143145 messages = [create_message (m ) for m in msg ]
144- req = ProduceRequest (self . topic , partition , messages )
146+ req = ProduceRequest (topic , partition , messages )
145147 try :
146148 resp = self .client .send_produce_request ([req ], acks = self .req_acks ,
147149 timeout = self .ack_timeout )
@@ -169,7 +171,6 @@ class SimpleProducer(Producer):
169171
170172 Params:
171173 client - The Kafka client instance to use
172- topic - The topic for sending messages to
173174 async - If True, the messages are sent asynchronously via another
174175 thread (process). We will not wait for a response to these
175176 req_acks - A value indicating the acknowledgements that the server must
@@ -180,27 +181,31 @@ class SimpleProducer(Producer):
180181 batch_send_every_n - If set, messages are send in batches of this size
181182 batch_send_every_t - If set, messages are send after this timeout
182183 """
183- def __init__ (self , client , topic , async = False ,
184+ def __init__ (self , client , async = False ,
184185 req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
185186 ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
186187 batch_send = False ,
187188 batch_send_every_n = BATCH_SEND_MSG_COUNT ,
188189 batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
189- self .topic = topic
190- client .load_metadata_for_topics (topic )
191- self .next_partition = cycle (client .topic_partitions [topic ])
192-
190+ self .partition_cycles = {}
193191 super (SimpleProducer , self ).__init__ (client , async , req_acks ,
194192 ack_timeout , batch_send ,
195193 batch_send_every_n ,
196194 batch_send_every_t )
197195
198- def send_messages (self , * msg ):
199- partition = self .next_partition .next ()
200- return super (SimpleProducer , self ).send_messages (partition , * msg )
196+ def _next_partition (self , topic ):
197+ if topic not in self .partition_cycles :
198+ if topic not in self .client .topic_partitions :
199+ self .client .load_metadata_for_topics (topic )
200+ self .partition_cycles [topic ] = cycle (self .client .topic_partitions [topic ])
201+ return self .partition_cycles [topic ].next ()
202+
203+ def send_messages (self , topic , * msg ):
204+ partition = self ._next_partition (topic )
205+ return super (SimpleProducer , self ).send_messages (topic , partition , * msg )
201206
202207 def __repr__ (self ):
203- return '<SimpleProducer topic=%s, batch=%s>' % ( self .topic , self . async )
208+ return '<SimpleProducer batch=%s>' % self .async
204209
205210
206211class KeyedProducer (Producer ):
@@ -209,7 +214,6 @@ class KeyedProducer(Producer):
209214
210215 Args:
211216 client - The kafka client instance
212- topic - The kafka topic to send messages to
213217 partitioner - A partitioner class that will be used to get the partition
214218 to send the message to. Must be derived from Partitioner
215219 async - If True, the messages are sent asynchronously via another
@@ -220,29 +224,34 @@ class KeyedProducer(Producer):
220224 batch_send_every_n - If set, messages are send in batches of this size
221225 batch_send_every_t - If set, messages are send after this timeout
222226 """
223- def __init__ (self , client , topic , partitioner = None , async = False ,
227+ def __init__ (self , client , partitioner = None , async = False ,
224228 req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
225229 ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
226230 batch_send = False ,
227231 batch_send_every_n = BATCH_SEND_MSG_COUNT ,
228232 batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
229- self .topic = topic
230- client .load_metadata_for_topics (topic )
231-
232233 if not partitioner :
233234 partitioner = HashedPartitioner
234-
235- self .partitioner = partitioner ( client . topic_partitions [ topic ])
235+ self . partitioner_class = partitioner
236+ self .partitioners = {}
236237
237238 super (KeyedProducer , self ).__init__ (client , async , req_acks ,
238239 ack_timeout , batch_send ,
239240 batch_send_every_n ,
240241 batch_send_every_t )
241242
242- def send (self , key , msg ):
243- partitions = self .client .topic_partitions [self .topic ]
244- partition = self .partitioner .partition (key , partitions )
245- return self .send_messages (partition , msg )
243+ def _next_partition (self , topic , key ):
244+ if topic not in self .partitioners :
245+ if topic not in self .client .topic_partitions :
246+ self .client .load_metadata_for_topics (topic )
247+ self .partitioners [topic ] = \
248+ self .partitioner_class (self .client .topic_partitions [topic ])
249+ partitioner = self .partitioners [topic ]
250+ return partitioner .partition (key , self .client .topic_partitions [topic ])
251+
252+ def send (self , topic , key , msg ):
253+ partition = self ._next_partition (topic , key )
254+ return self .send_messages (topic , partition , msg )
246255
247256 def __repr__ (self ):
248- return '<KeyedProducer topic=%s, batch=%s>' % ( self .topic , self . async )
257+ return '<KeyedProducer batch=%s>' % self .async
0 commit comments