File tree Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Original file line number Diff line number Diff line change 66 from queue import Full
77except ImportError :
88 from Queue import Full
9- from mock import MagicMock
9+ from mock import MagicMock , patch
1010from . import unittest
11-
11+ from six . moves import xrange
1212from kafka .producer .base import Producer
1313
1414
@@ -45,7 +45,8 @@ def partitions(topic):
4545 producer .send_messages (topic , b'hi' )
4646 assert client .send_produce_request .called
4747
48- def test_producer_async_queue_overfilled_batch_send (self ):
48+ @patch ('kafka.producer.base._send_upstream' )
49+ def test_producer_async_queue_overfilled_batch_send (self , mock ):
4950 queue_size = 2
5051 producer = Producer (MagicMock (), batch_send = True , maxsize = queue_size )
5152
@@ -57,6 +58,8 @@ def test_producer_async_queue_overfilled_batch_send(self):
5758 message_list = [message ] * (queue_size + 1 )
5859 producer .send_messages (topic , partition , * message_list )
5960 self .assertEqual (producer .queue .qsize (), queue_size )
61+ for _ in xrange (producer .queue .qsize ()):
62+ producer .queue .get ()
6063
6164 def test_producer_async_queue_overfilled (self ):
6265 queue_size = 2
You can’t perform that action at this time.
0 commit comments