Skip to content

Commit fe382a5

Browse files
committed
Merge pull request #394 from dpkp/cleanups
Cleanups
2 parents 712377a + 81abf09 commit fe382a5

File tree

8 files changed

+88
-152
lines changed

8 files changed

+88
-152
lines changed

kafka/client.py

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import functools
55
import logging
66
import time
7-
import kafka.common
87

8+
import kafka.common
99
from kafka.common import (TopicAndPartition, BrokerMetadata,
1010
ConnectionError, FailedPayloadsError,
1111
KafkaTimeoutError, KafkaUnavailableError,
@@ -22,7 +22,7 @@
2222

2323
class KafkaClient(object):
2424

25-
CLIENT_ID = b"kafka-python"
25+
CLIENT_ID = b'kafka-python'
2626

2727
# NOTE: The timeout given to the client should always be greater than the
2828
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -50,7 +50,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,
5050
##################
5151

5252
def _get_conn(self, host, port):
53-
"Get or create a connection to a broker using host and port"
53+
"""Get or create a connection to a broker using host and port"""
5454
host_key = (host, port)
5555
if host_key not in self.conns:
5656
self.conns[host_key] = KafkaConnection(
@@ -111,6 +111,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
111111
"""
112112
for (host, port) in self.hosts:
113113
requestId = self._next_id()
114+
log.debug('Request %s: %s', requestId, payloads)
114115
try:
115116
conn = self._get_conn(host, port)
116117
request = encoder_fn(client_id=self.client_id,
@@ -119,13 +120,15 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
119120

120121
conn.send(requestId, request)
121122
response = conn.recv(requestId)
122-
return decoder_fn(response)
123+
decoded = decoder_fn(response)
124+
log.debug('Response %s: %s', requestId, decoded)
125+
return decoded
123126

124127
except Exception:
125-
log.exception("Could not send request [%r] to server %s:%i, "
126-
"trying next server" % (requestId, host, port))
128+
log.exception('Error sending request [%s] to server %s:%s, '
129+
'trying next server', requestId, host, port)
127130

128-
raise KafkaUnavailableError("All servers failed to process request")
131+
raise KafkaUnavailableError('All servers failed to process request')
129132

130133
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
131134
"""
@@ -150,9 +153,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
150153
151154
List of response objects in the same order as the supplied payloads
152155
"""
153-
154-
log.debug("Sending Payloads: %s" % payloads)
155-
156156
# Group the requests by topic+partition
157157
brokers_for_payloads = []
158158
payloads_by_broker = collections.defaultdict(list)
@@ -170,6 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
170170
broker_failures = []
171171
for broker, payloads in payloads_by_broker.items():
172172
requestId = self._next_id()
173+
log.debug('Request %s to %s: %s', requestId, broker, payloads)
173174
request = encoder_fn(client_id=self.client_id,
174175
correlation_id=requestId, payloads=payloads)
175176

@@ -180,7 +181,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
180181

181182
except ConnectionError as e:
182183
broker_failures.append(broker)
183-
log.warning("Could not send request [%s] to server %s: %s",
184+
log.warning('Could not send request [%s] to server %s: %s',
184185
binascii.b2a_hex(request), broker, e)
185186

186187
for payload in payloads:
@@ -201,15 +202,14 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
201202
response = conn.recv(requestId)
202203
except ConnectionError as e:
203204
broker_failures.append(broker)
204-
log.warning("Could not receive response to request [%s] "
205-
"from server %s: %s",
205+
log.warning('Could not receive response to request [%s] '
206+
'from server %s: %s',
206207
binascii.b2a_hex(request), conn, e)
207208

208209
for payload in payloads:
209210
responses_by_broker[broker].append(FailedPayloadsError(payload))
210211

211212
else:
212-
213213
for payload_response in decoder_fn(response):
214214
responses_by_broker[broker].append(payload_response)
215215

@@ -223,7 +223,6 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
223223
# Return responses in the same order as provided
224224
responses_by_payload = [responses_by_broker[broker].pop(0)
225225
for broker in brokers_for_payloads]
226-
log.debug('Responses: %s' % responses_by_payload)
227226
return responses_by_payload
228227

229228
def __repr__(self):
@@ -254,8 +253,11 @@ def close(self):
254253

255254
def copy(self):
256255
"""
257-
Create an inactive copy of the client object
258-
A reinit() has to be done on the copy before it can be used again
256+
Create an inactive copy of the client object, suitable for passing
257+
to a separate thread.
258+
259+
Note that the copied connections are not initialized, so reinit() must
260+
be called on the returned copy.
259261
"""
260262
c = copy.deepcopy(self)
261263
for key in c.conns:
@@ -297,7 +299,7 @@ def ensure_topic_exists(self, topic, timeout = 30):
297299

298300
while not self.has_metadata_for_topic(topic):
299301
if time.time() > start_time + timeout:
300-
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
302+
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
301303
try:
302304
self.load_metadata_for_topics(topic)
303305
except LeaderNotAvailableError:
@@ -345,8 +347,8 @@ def load_metadata_for_topics(self, *topics):
345347

346348
resp = self.send_metadata_request(topics)
347349

348-
log.debug("Received new broker metadata: %s", resp.brokers)
349-
log.debug("Received new topic metadata: %s", resp.topics)
350+
log.debug('Received new broker metadata: %s', resp.brokers)
351+
log.debug('Received new topic metadata: %s', resp.topics)
350352

351353
self.brokers = dict([(broker.nodeId, broker)
352354
for broker in resp.brokers])
@@ -365,7 +367,7 @@ def load_metadata_for_topics(self, *topics):
365367
raise
366368

367369
# Otherwise, just log a warning
368-
log.error("Error loading topic metadata for %s: %s", topic, type(e))
370+
log.error('Error loading topic metadata for %s: %s', topic, type(e))
369371
continue
370372

371373
self.topic_partitions[topic] = {}
@@ -406,7 +408,6 @@ def load_metadata_for_topics(self, *topics):
406408

407409
def send_metadata_request(self, payloads=[], fail_on_error=True,
408410
callback=None):
409-
410411
encoder = KafkaProtocol.encode_metadata_request
411412
decoder = KafkaProtocol.decode_metadata_response
412413

kafka/conn.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,11 @@ def recv(self, request_id):
161161

162162
def copy(self):
163163
"""
164-
Create an inactive copy of the connection object
165-
A reinit() has to be done on the copy before it can be used again
166-
return a new KafkaConnection object
164+
Create an inactive copy of the connection object, suitable for
165+
passing to a background thread.
166+
167+
The returned copy is not connected; you must call reinit() before
168+
using.
167169
"""
168170
c = copy.deepcopy(self)
169171
# Python 3 doesn't copy custom attributes of the threadlocal subclass

kafka/consumer/multiprocess.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
from __future__ import absolute_import
22

3-
import logging
4-
import time
5-
63
from collections import namedtuple
4+
import logging
75
from multiprocessing import Process, Manager as MPManager
8-
96
try:
10-
from Queue import Empty, Full
11-
except ImportError: # python 2
12-
from queue import Empty, Full
7+
from Queue import Empty, Full # python 3
8+
except ImportError:
9+
from queue import Empty, Full # python 2
10+
import time
1311

1412
from .base import (
13+
Consumer,
1514
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
1615
NO_MESSAGES_WAIT_TIME_SECONDS,
1716
FULL_QUEUE_WAIT_TIME_SECONDS
1817
)
19-
from .simple import Consumer, SimpleConsumer
18+
from .simple import SimpleConsumer
2019

2120

2221
log = logging.getLogger(__name__)

kafka/consumer/simple.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,18 @@
22

33
try:
44
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
5-
except ImportError: # python 2
6-
from itertools import izip_longest as izip_longest, repeat
5+
except ImportError:
6+
from itertools import izip_longest as izip_longest, repeat # python 2
77
import logging
8+
try:
9+
from Queue import Empty, Queue # python 3
10+
except ImportError:
11+
from queue import Empty, Queue # python 2
12+
import sys
813
import time
914

1015
import six
11-
import sys
12-
13-
try:
14-
from Queue import Empty, Queue
15-
except ImportError: # python 2
16-
from queue import Empty, Queue
1716

18-
from kafka.common import (
19-
FetchRequest, OffsetRequest,
20-
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
21-
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
22-
OffsetOutOfRangeError, FailedPayloadsError, check_error
23-
)
2417
from .base import (
2518
Consumer,
2619
FETCH_DEFAULT_BLOCK_TIMEOUT,
@@ -33,6 +26,12 @@
3326
ITER_TIMEOUT_SECONDS,
3427
NO_MESSAGES_WAIT_TIME_SECONDS
3528
)
29+
from ..common import (
30+
FetchRequest, OffsetRequest,
31+
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
32+
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
33+
OffsetOutOfRangeError, FailedPayloadsError, check_error
34+
)
3635

3736

3837
log = logging.getLogger(__name__)

kafka/producer/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ class Producer(object):
206206
207207
Arguments:
208208
client (KafkaClient): instance to use for broker communications.
209+
If async=True, the background thread will use client.copy(),
210+
which is expected to return a thread-safe object.
209211
codec (kafka.protocol.ALL_CODECS): compression codec to use.
210212
req_acks (int, optional): A value indicating the acknowledgements that
211213
the server must receive before responding to the request,

kafka/producer/keyed.py

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,10 @@
33
import logging
44
import warnings
55

6-
from kafka.partitioner import HashedPartitioner
7-
from kafka.util import kafka_bytestring
6+
from .base import Producer
7+
from ..partitioner import HashedPartitioner
8+
from ..util import kafka_bytestring
89

9-
from .base import (
10-
Producer, BATCH_SEND_DEFAULT_INTERVAL,
11-
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
12-
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
13-
)
1410

1511
log = logging.getLogger(__name__)
1612

@@ -19,46 +15,17 @@ class KeyedProducer(Producer):
1915
"""
2016
A producer which distributes messages to partitions based on the key
2117
22-
Arguments:
23-
client: The kafka client instance
18+
See Producer class for Arguments
2419
25-
Keyword Arguments:
20+
Additional Arguments:
2621
partitioner: A partitioner class that will be used to get the partition
27-
to send the message to. Must be derived from Partitioner
28-
async: If True, the messages are sent asynchronously via another
29-
thread (process). We will not wait for a response to these
30-
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
31-
for an acknowledgement
32-
batch_send: If True, messages are send in batches
33-
batch_send_every_n: If set, messages are send in batches of this size
34-
batch_send_every_t: If set, messages are send after this timeout
22+
to send the message to. Must be derived from Partitioner.
23+
Defaults to HashedPartitioner.
3524
"""
36-
def __init__(self, client, partitioner=None, async=False,
37-
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
38-
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
39-
codec=None,
40-
batch_send=False,
41-
batch_send_every_n=BATCH_SEND_MSG_COUNT,
42-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
43-
async_retry_limit=ASYNC_RETRY_LIMIT,
44-
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
45-
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
46-
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
47-
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
48-
if not partitioner:
49-
partitioner = HashedPartitioner
50-
self.partitioner_class = partitioner
25+
def __init__(self, *args, **kwargs):
26+
self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
5127
self.partitioners = {}
52-
53-
super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
54-
codec, async, batch_send,
55-
batch_send_every_n,
56-
batch_send_every_t,
57-
async_retry_limit,
58-
async_retry_backoff_ms,
59-
async_retry_on_timeouts,
60-
async_queue_maxsize,
61-
async_queue_put_timeout)
28+
super(KeyedProducer, self).__init__(*args, **kwargs)
6229

6330
def _next_partition(self, topic, key):
6431
if topic not in self.partitioners:

0 commit comments

Comments
 (0)