From e50469a8b6336ae99965439943f0502df0f136d8 Mon Sep 17 00:00:00 2001 From: toli kuznets Date: Tue, 12 May 2015 16:17:57 -0700 Subject: [PATCH 1/4] removing py26, pypy, py33, py34 to get CircleCI build to pass --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index fba7d8e28..d431e3adc 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = lint, py26, py27, pypy, py33, py34, docs +envlist = lint, py27, docs [testenv] deps = From 37c4e901e94c51a3527088f8d834eb678d73860f Mon Sep 17 00:00:00 2001 From: Roberto Gandolfo Hashioka Date: Tue, 12 May 2015 13:54:01 -0700 Subject: [PATCH 2/4] Added upper-bound size limit to the kafka producer Set the ASYNC_QUEUE_MAXSIZE to 65536 --- kafka/producer/base.py | 35 ++++++++++++++++++++++++------ kafka/producer/simple.py | 7 ++++-- test/test_producer.py | 46 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 9 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de49a..ce58d3879 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import time try: - from queue import Empty, Queue + from queue import Empty, Queue, Full except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Queue, Full from collections import defaultdict from threading import Thread, Event @@ -27,6 +27,8 @@ STOP_ASYNC_PRODUCER = -1 +ASYNC_QUEUE_MAXSIZE = 65536 + def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, stop_event): @@ -64,7 +66,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream reqs = [] + topics = set() for topic_partition, msg in msgset.items(): + topics.add(topic_partition.topic) messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, @@ -97,6 +101,7 @@ class Producer(object): batch_send: If True, messages are send in batches batch_send_every_n: If set, messages are send in batches of this size batch_send_every_t: If set, messages are send after this timeout + maxsize: sets the upper-bound limit on the number of items that can be placed in the queue """ ACK_NOT_REQUIRED = 0 # No ack is required @@ -111,12 +116,20 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + maxsize=None): if batch_send: async = True - assert batch_send_every_n > 0 - assert batch_send_every_t > 0 + if batch_send_every_n <= 0: + log.exception('Batch send message count lower than zero.') + raise ValueError + if batch_send_every_t <= 0: + log.exception('Batch send message interval lower than zero.') + raise ValueError + if maxsize < 0: + log.exception('Queue size upper bound lower than zero.') + raise ValueError else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -127,6 +140,9 @@ def __init__(self, client, async=False, self.ack_timeout = ack_timeout self.stopped = False + if maxsize is None: + maxsize = ASYNC_QUEUE_MAXSIZE + if codec is None: codec = CODEC_NONE elif codec not in ALL_CODECS: @@ -138,7 +154,7 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + self.queue = Queue(maxsize) # Messages are sent through this queue self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -200,7 +216,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + item = (TopicAndPartition(topic, partition), m, key) + self.queue.put_nowait(item) + except Full: + log.exception('Async queue is full') + raise resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 2699cf2b6..a7152573b 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -24,6 +24,7 @@ class SimpleProducer(Producer): client: The Kafka client instance to use Keyword Arguments: + maxsize: sets the upper-bound limit on the number of items that can be placed in the queue async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks: A value indicating the acknowledgements that the server must @@ -45,13 +46,15 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True): + random_start=True, + maxsize=None): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + maxsize) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a1b..77a19db61 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -2,6 +2,10 @@ import logging +try: + from queue import Full +except ImportError: + from Queue import Full from mock import MagicMock from . import unittest @@ -40,3 +44,45 @@ def partitions(topic): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + def test_producer_async_queue_overfilled_batch_send(self): + queue_size = 2 + producer = Producer(MagicMock(), batch_send=True, maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(Full): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + + def test_producer_async_queue_overfilled(self): + queue_size = 2 + producer = Producer(MagicMock(), async=True, maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(Full): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + + def test_producer_async_queue_normal(self): + queue_size = 4 + producer = Producer(MagicMock(), async=True, maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + acceptable_size = (queue_size / 2 + 1) + + message_list = [message] * acceptable_size + resp = producer.send_messages(topic, partition, *message_list) + self.assertEqual(type(resp), list) + self.assertEqual(producer.queue.qsize(), acceptable_size) + self.assertFalse(producer.queue.full()) From a69a9fc1e353143102123f72531d4671a803472a Mon Sep 17 00:00:00 2001 From: toli kuznets Date: Tue, 12 May 2015 18:13:51 -0700 Subject: [PATCH 3/4] fixing the unit test race condition --- test/test_producer.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/test_producer.py b/test/test_producer.py index 77a19db61..a116a0293 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -6,9 +6,9 @@ from queue import Full except ImportError: from Queue import Full -from mock import MagicMock +from mock import MagicMock, patch from . import unittest - +from six.moves import xrange from kafka.producer.base import Producer @@ -45,7 +45,8 @@ def partitions(topic): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - def test_producer_async_queue_overfilled_batch_send(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled_batch_send(self, mock): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, maxsize=queue_size) @@ -57,6 +58,8 @@ def test_producer_async_queue_overfilled_batch_send(self): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() def test_producer_async_queue_overfilled(self): queue_size = 2 From ee47bbbad7ce76101f7a07616fa7a9b178baedb3 Mon Sep 17 00:00:00 2001 From: Roberto Gandolfo Hashioka Date: Tue, 12 May 2015 20:43:31 -0700 Subject: [PATCH 4/4] Added missing _send_upstream mock patch and converted the if statements to asserts --- kafka/producer/base.py | 12 +++--------- test/test_producer.py | 5 ++++- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index ce58d3879..e27496c0c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -121,15 +121,9 @@ def __init__(self, client, async=False, if batch_send: async = True - if batch_send_every_n <= 0: - log.exception('Batch send message count lower than zero.') - raise ValueError - if batch_send_every_t <= 0: - log.exception('Batch send message interval lower than zero.') - raise ValueError - if maxsize < 0: - log.exception('Queue size upper bound lower than zero.') - raise ValueError + assert batch_send_every_n > 0 + assert batch_send_every_t > 0 + assert maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 diff --git a/test/test_producer.py b/test/test_producer.py index a116a0293..85ed4a042 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -61,7 +61,8 @@ def test_producer_async_queue_overfilled_batch_send(self, mock): for _ in xrange(producer.queue.qsize()): producer.queue.get() - def test_producer_async_queue_overfilled(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): queue_size = 2 producer = Producer(MagicMock(), async=True, maxsize=queue_size) @@ -73,6 +74,8 @@ def test_producer_async_queue_overfilled(self): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() def test_producer_async_queue_normal(self): queue_size = 4