Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
410567e
Check for socket status on read as well as send
turtlesoupy Jan 7, 2014
83b7adc
Propagate error immediately if dirty
turtlesoupy Jan 7, 2014
6ba62e7
Enable absolute imports for modules using Queue.
Oct 21, 2013
6c1fbaf
Allow customizing socket timeouts.
rdiomar Dec 19, 2013
2937028
Read the correct number of bytes from kafka.
rdiomar Dec 19, 2013
afa54fe
* Guarantee reading the expected number of bytes from the socket ever…
rdiomar Dec 19, 2013
c1877ad
Allow None timeout in FetchContext even if block is False
rdiomar Dec 19, 2013
2dd78f2
Reset consumer fields to original values rather than defaults in Fetc…
rdiomar Dec 19, 2013
225db8e
SimpleConsumer flow changes:
rdiomar Dec 19, 2013
781d6aa
Remove SimpleConsumer queue size limit since it can cause the iterator
rdiomar Dec 19, 2013
8570457
Add buffer_size param description to docstring
rdiomar Dec 19, 2013
6840af8
Add iter_timeout option to SimpleConsumer. If not None, it causes the…
rdiomar Dec 19, 2013
3b8f445
Add comments and maintain 80 character line limit
rdiomar Dec 19, 2013
a303105
Add and fix comments to protocol.py
rdiomar Jan 3, 2014
a31156e
Add note about questionable error handling while decoding messages.
rdiomar Jan 3, 2014
39d76bb
Fix unit tests.
rdiomar Jan 3, 2014
8009bb0
Style fix for imports
rdiomar Jan 3, 2014
b7ee169
Fix seek offset deltas
rdiomar Jan 3, 2014
2724043
Raise a ConnectionError when a socket.error is raised when receiving …
rdiomar Jan 3, 2014
34da78b
Fix client error handling
rdiomar Jan 3, 2014
3dfed95
Add a limit to fetch buffer size, and actually retry requests when fe…
rdiomar Jan 3, 2014
9802393
Handle starting/stopping Kafka brokers that are already started/stopp…
rdiomar Jan 3, 2014
1e3f24b
Remove unnecessary brackets
rdiomar Jan 4, 2014
339650f
Fix client and consumer params in integration tests
rdiomar Jan 4, 2014
fcf23c4
Add tests for limited and unlimited consumer max_buffer_size
rdiomar Jan 4, 2014
ac411bf
Make kafka brokers per-test in failover integration tests
rdiomar Jan 6, 2014
7a999a1
Add object type and ID to message prefix in fixtures output for easie…
rdiomar Jan 6, 2014
a61990b
Use the same timeout when reinitializing a connection
rdiomar Jan 8, 2014
c33c0d5
Handle dirty flag in conn.recv()
rdiomar Jan 8, 2014
86770b8
Remove unnecessary method
rdiomar Jan 8, 2014
4afe2dd
Skip snappy/gzip tests if they're not available
rdiomar Jan 8, 2014
d4d7981
Some cleanup and easier to read test fixture output
rdiomar Jan 8, 2014
a2cfc70
Change BufferUnderflowError to ConnectionError in conn._read_bytes()
rdiomar Jan 8, 2014
2b4451c
Change log.error() back to log.exception()
rdiomar Jan 8, 2014
e303b37
Check for socket status on read as well as send
turtlesoupy Jan 7, 2014
a6c25fd
Propagate error immediately if dirty
turtlesoupy Jan 7, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from functools import partial
from itertools import count
import logging
import socket
import time

from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.common import (
ErrorMapping, TopicAndPartition, ConnectionError,
FailedPayloadsException
)
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol

Expand All @@ -19,12 +20,12 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()

def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, bufsize)
(host, port): KafkaConnection(host, port, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
Expand All @@ -41,7 +42,7 @@ def _get_conn_for_broker(self, broker):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, self.bufsize)
KafkaConnection(broker.host, broker.port, timeout=self.timeout)

return self.conns[(broker.host, broker.port)]

Expand Down Expand Up @@ -165,14 +166,24 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)

failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
response = conn.recv(requestId)
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
try:
response = conn.recv(requestId)
except ConnectionError, e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
except ConnectionError, e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True

if failed:
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
continue
Expand Down
74 changes: 33 additions & 41 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import struct
from threading import local

from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError

log = logging.getLogger("kafka")
Expand All @@ -19,14 +18,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
def __init__(self, host, port, bufsize=4096):
def __init__(self, host, port, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
self.timeout = timeout
self._sock.settimeout(self.timeout)
self._dirty = False

def __str__(self):
Expand All @@ -36,44 +35,31 @@ def __str__(self):
# Private API #
###################

def _consume_response(self):
"""
Fully consume the response iterator
"""
return "".join(self._consume_response_iter())

def _consume_response_iter(self):
"""
This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")

# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
self._raise_connection_error()
(size,) = struct.unpack('>i', resp)

messagesize = size - 4
log.debug("About to read %d bytes from Kafka", messagesize)

# Read the remainder of the response
total = 0
while total < messagesize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
raise BufferUnderflowError(
"Not enough data to read this response")

total += len(resp)
yield resp

def _raise_connection_error(self):
self._dirty = True
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))

def _read_bytes(self, num_bytes):
bytes_left = num_bytes
resp = ''
log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
self.reinit()
while bytes_left:
try:
data = self._sock.recv(bytes_left)
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
if data == '':
log.error("Not enough data to read this response")
self._raise_connection_error()
bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
resp += data

return resp

##################
# Public API #
##################
Expand All @@ -98,8 +84,14 @@ def recv(self, request_id):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
self.data = self._consume_response()
return self.data
# Read the size off of the header
resp = self._read_bytes(4)

(size,) = struct.unpack('>i', resp)

# Read the remainder of the response
resp = self._read_bytes(size)
return str(resp)

def copy(self):
"""
Expand All @@ -124,5 +116,5 @@ def reinit(self):
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
self._sock.settimeout(self.timeout)
self._dirty = False
Loading