Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
60ccb4d
Allow customizing socket timeouts.
rdiomar Dec 19, 2013
0f2b08d
Read the correct number of bytes from kafka.
rdiomar Dec 19, 2013
8c8ca5f
* Guarantee reading the expected number of bytes from the socket ever…
rdiomar Dec 19, 2013
4d6bafa
Allow None timeout in FetchContext even if block is False
rdiomar Dec 19, 2013
5dd8d81
Reset consumer fields to original values rather than defaults in Fetc…
rdiomar Dec 19, 2013
0c7cf25
SimpleConsumer flow changes:
rdiomar Dec 19, 2013
b68523f
Remove SimpleConsumer queue size limit since it can cause the iterator
rdiomar Dec 19, 2013
450faeb
Add buffer_size param description to docstring
rdiomar Dec 19, 2013
dc4198b
Add iter_timeout option to SimpleConsumer. If not None, it causes the…
rdiomar Dec 19, 2013
c1ba510
Add comments and maintain 80 character line limit
rdiomar Dec 19, 2013
93b6579
Add and fix comments to protocol.py
rdiomar Jan 3, 2014
009ed92
Add note about questionable error handling while decoding messages.
rdiomar Jan 3, 2014
b6b1ba0
Fix unit tests.
rdiomar Jan 3, 2014
99b561d
Style fix for imports
rdiomar Jan 3, 2014
81d001b
Fix seek offset deltas
rdiomar Jan 3, 2014
d1e4fd2
Raise a ConnectionError when a socket.error is raised when receiving …
rdiomar Jan 3, 2014
8540f1f
Fix client error handling
rdiomar Jan 3, 2014
bbd90e1
Add a limit to fetch buffer size, and actually retry requests when fe…
rdiomar Jan 3, 2014
6d2b28a
Handle starting/stopping Kafka brokers that are already started/stopp…
rdiomar Jan 3, 2014
5721854
Remove unnecessary brackets
rdiomar Jan 4, 2014
59f884c
Fix client and consumer params in integration tests
rdiomar Jan 4, 2014
d736d0b
Add tests for limited and unlimited consumer max_buffer_size
rdiomar Jan 4, 2014
c11ff04
Make kafka brokers per-test in failover integration tests
rdiomar Jan 6, 2014
e0c45ff
Add object type and ID to message prefix in fixtures output for easie…
rdiomar Jan 6, 2014
e5a5477
Use the same timeout when reinitializing a connection
rdiomar Jan 8, 2014
b4c20ac
Handle dirty flag in conn.recv()
rdiomar Jan 8, 2014
2077653
Remove unnecessary method
rdiomar Jan 8, 2014
f333e91
Skip snappy/gzip tests if they're not available
rdiomar Jan 8, 2014
9cbe45d
Some cleanup and easier to read test fixture output
rdiomar Jan 8, 2014
317c848
Change BufferUnderflowError to ConnectionError in conn._read_bytes()
rdiomar Jan 8, 2014
a0c7141
Change log.error() back to log.exception()
rdiomar Jan 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from functools import partial
from itertools import count
import logging
import socket
import time

from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.common import (
ErrorMapping, TopicAndPartition, ConnectionError,
FailedPayloadsException
)
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol

Expand All @@ -19,12 +20,12 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()

def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, bufsize)
(host, port): KafkaConnection(host, port, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
Expand All @@ -41,7 +42,7 @@ def _get_conn_for_broker(self, broker):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, self.bufsize)
KafkaConnection(broker.host, broker.port, timeout=self.timeout)

return self.conns[(broker.host, broker.port)]

Expand Down Expand Up @@ -165,14 +166,24 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)

failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
response = conn.recv(requestId)
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
try:
response = conn.recv(requestId)
except ConnectionError, e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
except ConnectionError, e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True

if failed:
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
continue
Expand Down
76 changes: 34 additions & 42 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import struct
from threading import local

from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError

log = logging.getLogger("kafka")
Expand All @@ -19,14 +18,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
def __init__(self, host, port, bufsize=4096):
def __init__(self, host, port, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
self.timeout = timeout
self._sock.settimeout(self.timeout)
self._dirty = False

def __str__(self):
Expand All @@ -36,44 +35,31 @@ def __str__(self):
# Private API #
###################

def _consume_response(self):
"""
Fully consume the response iterator
"""
return "".join(self._consume_response_iter())

def _consume_response_iter(self):
"""
This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")

# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
self._raise_connection_error()
(size,) = struct.unpack('>i', resp)

messagesize = size - 4
log.debug("About to read %d bytes from Kafka", messagesize)

# Read the remainder of the response
total = 0
while total < messagesize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
raise BufferUnderflowError(
"Not enough data to read this response")

total += len(resp)
yield resp

def _raise_connection_error(self):
self._dirty = True
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))

def _read_bytes(self, num_bytes):
bytes_left = num_bytes
resp = ''
log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, I think that this check should probably move into recv instead of _read_bytes (i.e. give this method less responsibility)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I put it here is because it's right before calling self._socket.recv(), just as it is is right before calling self._socket.sendall() in send().

self.reinit()
while bytes_left:
try:
data = self._sock.recv(bytes_left)
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
if data == '':
log.error("Not enough data to read this response")
self._raise_connection_error()
bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
resp += data

return resp

##################
# Public API #
##################
Expand All @@ -89,7 +75,7 @@ def send(self, request_id, payload):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
except socket.error:
except socket.error, e:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

Expand All @@ -98,8 +84,14 @@ def recv(self, request_id):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
self.data = self._consume_response()
return self.data
# Read the size off of the header
resp = self._read_bytes(4)

(size,) = struct.unpack('>i', resp)

# Read the remainder of the response
resp = self._read_bytes(size)
return str(resp)

def copy(self):
"""
Expand All @@ -124,5 +116,5 @@ def reinit(self):
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
self._sock.settimeout(self.timeout)
self._dirty = False
Loading