Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kafka.vendor import six

import kafka.errors
from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
Expand Down Expand Up @@ -73,7 +73,7 @@ def _get_conn(self, host, port, afi):
conn = self._conns[host_key]
if not conn.connect_blocking(self.timeout):
conn.close()
raise ConnectionError("%s:%s (%s)" % (host, port, afi))
raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi))
return conn

def _get_leader_for_partition(self, topic, partition):
Expand Down Expand Up @@ -156,7 +156,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
for (host, port, afi) in hosts:
try:
conn = self._get_conn(host, port, afi)
except ConnectionError:
except KafkaConnectionError:
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
host, port, afi)
continue
Expand Down Expand Up @@ -242,7 +242,7 @@ def failed_payloads(payloads):
host, port, afi = get_ip_port_afi(broker.host)
try:
conn = self._get_conn(host, broker.port, afi)
except ConnectionError:
except KafkaConnectionError:
refresh_metadata = True
failed_payloads(broker_payloads)
continue
Expand Down Expand Up @@ -344,8 +344,8 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
except KafkaConnectionError as e:
log.warning('KafkaConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)

for payload in payloads:
Expand Down
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def _poll(self, timeout):
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
pass
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue

self._idle_expiry_manager.update(conn.node_id)
Expand Down
28 changes: 14 additions & 14 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def connect(self):
self.last_attempt = time.time()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
self.close(Errors.ConnectionError('DNS failure'))
self.close(Errors.KafkaConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
Expand Down Expand Up @@ -381,12 +381,12 @@ def connect(self):
log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret)
errstr = errno.errorcode.get(ret, 'UNKNOWN')
self.close(Errors.ConnectionError('{} {}'.format(ret, errstr)))
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))

# Connection timed out
elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
self.close(Errors.ConnectionError('timeout'))
self.close(Errors.KafkaConnectionError('timeout'))

# Needs retry
else:
Expand Down Expand Up @@ -463,7 +463,7 @@ def _try_handshake(self):
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user

return False
Expand All @@ -488,7 +488,7 @@ def _try_authenticate(self):
return False
elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
if not isinstance(ex, Errors.ConnectionError):
if not isinstance(ex, Errors.KafkaConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()

Expand Down Expand Up @@ -558,8 +558,8 @@ def _try_authenticate_plain(self, future):
data = self._recv_bytes_blocking(4)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
log.exception("%s: Error receiving reply from server", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)

Expand Down Expand Up @@ -621,7 +621,7 @@ def _try_authenticate_gssapi(self, future):

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
except Exception as e:
Expand Down Expand Up @@ -701,7 +701,7 @@ def close(self, error=None):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
Default: kafka.errors.ConnectionError.
Default: kafka.errors.KafkaConnectionError.
"""
if self.state is ConnectionStates.DISCONNECTED:
if error is not None:
Expand Down Expand Up @@ -733,7 +733,7 @@ def send(self, request):
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
elif not self.connected():
return future.failure(Errors.ConnectionError(str(self)))
return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request)
Expand All @@ -753,7 +753,7 @@ def _send(self, request):
self._sensors.bytes_sent.record(total_bytes)
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (self, e))
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
Expand Down Expand Up @@ -781,7 +781,7 @@ def recv(self):
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
return ()

elif not self.in_flight_requests:
Expand Down Expand Up @@ -821,7 +821,7 @@ def _recv(self):
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
self.close(error=Errors.KafkaConnectionError('socket disconnected'))
return []
else:
recvd.append(data)
Expand All @@ -833,7 +833,7 @@ def _recv(self):
break
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
self.close(error=Errors.KafkaConnectionError(e))
return []
except BlockingIOError:
if six.PY3:
Expand Down
6 changes: 3 additions & 3 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def __init__(self, payload, *args):
self.payload = payload


class ConnectionError(KafkaError):
class KafkaConnectionError(KafkaError):
retriable = True
invalid_metadata = True

Expand Down Expand Up @@ -517,13 +517,13 @@ def check_error(response):

RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
ConnectionError, FailedPayloadsError
KafkaConnectionError, FailedPayloadsError
)


RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
LeaderNotAvailableError, ConnectionError
LeaderNotAvailableError, KafkaConnectionError
)


Expand Down
1 change: 0 additions & 1 deletion kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ def send_messages(self, topic, partition, *msg):
Raises:
FailedPayloadsError: low-level connection error, can be caused by
networking failures, or a malformed request.
ConnectionError:
KafkaUnavailableError: all known brokers are down when attempting
to refresh metadata.
LeaderNotAvailableError: topic or partition is initializing or
Expand Down
2 changes: 1 addition & 1 deletion test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kafka import SimpleClient
from kafka.errors import (
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
UnknownTopicOrPartitionError, FailedPayloadsError)
from kafka.future import Future
from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
Expand Down
4 changes: 2 additions & 2 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_send_disconnected(conn):
conn.state = ConnectionStates.DISCONNECTED
f = conn.send('foobar')
assert f.failed() is True
assert isinstance(f.exception, Errors.ConnectionError)
assert isinstance(f.exception, Errors.KafkaConnectionError)


def test_send_connecting(conn):
Expand Down Expand Up @@ -162,7 +162,7 @@ def test_send_error(_socket, conn):
_socket.send.side_effect = socket.error
f = conn.send(req)
assert f.failed() is True
assert isinstance(f.exception, Errors.ConnectionError)
assert isinstance(f.exception, Errors.KafkaConnectionError)
assert _socket.close.call_count == 1
assert conn.state is ConnectionStates.DISCONNECTED

Expand Down
6 changes: 3 additions & 3 deletions test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from kafka import SimpleClient, SimpleConsumer, KeyedProducer
from kafka.errors import (
FailedPayloadsError, ConnectionError, RequestTimedOutError,
FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError)
from kafka.producer.base import Producer
from kafka.structs import TopicPartition
Expand Down Expand Up @@ -79,7 +79,7 @@ def test_switch_leader(self):
producer.send_messages(topic, partition, b'success')
log.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError):
log.debug("caught exception sending message -- will retry")
continue
Expand Down Expand Up @@ -167,7 +167,7 @@ def test_switch_leader_keyed_producer(self):
producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError):
log.debug("caught exception sending message -- will retry")
continue
Expand Down