Skip to content

Commit acbc346

Browse files
authored
Connect with sockaddrs to support non-zero ipv6 scope ids (#1433)
1 parent 3dc536a commit acbc346

File tree

2 files changed

+22
-28
lines changed

2 files changed

+22
-28
lines changed

kafka/conn.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,8 @@ def __init__(self, host, port, afi, **configs):
215215
self.host = host
216216
self.port = port
217217
self.afi = afi
218-
self._sock_ip = host
219-
self._sock_port = port
220218
self._sock_afi = afi
219+
self._sock_addr = None
221220
self.in_flight_requests = collections.deque()
222221
self._api_versions = None
223222

@@ -279,13 +278,12 @@ def _dns_lookup(self):
279278
return False
280279
return True
281280

282-
def _next_afi_host_port(self):
281+
def _next_afi_sockaddr(self):
283282
if not self._gai:
284283
if not self._dns_lookup():
285284
return
286285
afi, _, __, ___, sockaddr = self._gai.pop(0)
287-
host, port = sockaddr[:2]
288-
return (afi, host, port)
286+
return (afi, sockaddr)
289287

290288
def connect_blocking(self, timeout=float('inf')):
291289
if self.connected():
@@ -327,13 +325,13 @@ def connect(self):
327325
"""Attempt to connect and return ConnectionState"""
328326
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
329327
self.last_attempt = time.time()
330-
next_lookup = self._next_afi_host_port()
328+
next_lookup = self._next_afi_sockaddr()
331329
if not next_lookup:
332330
self.close(Errors.ConnectionError('DNS failure'))
333331
return
334332
else:
335333
log.debug('%s: creating new socket', self)
336-
self._sock_afi, self._sock_ip, self._sock_port = next_lookup
334+
self._sock_afi, self._sock_addr = next_lookup
337335
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
338336

339337
for option in self.config['socket_options']:
@@ -348,17 +346,16 @@ def connect(self):
348346
# so we need to double check that we are still connecting before
349347
if self.connecting():
350348
self.config['state_change_callback'](self)
351-
log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
352-
self.port, self._sock_ip, self._sock_port,
353-
AFI_NAMES[self._sock_afi])
349+
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
350+
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
354351

355352
if self.state is ConnectionStates.CONNECTING:
356353
# in non-blocking mode, use repeated calls to socket.connect_ex
357354
# to check connection status
358355
request_timeout = self.config['request_timeout_ms'] / 1000.0
359356
ret = None
360357
try:
361-
ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
358+
ret = self._sock.connect_ex(self._sock_addr)
362359
except socket.error as err:
363360
ret = err.errno
364361

@@ -999,9 +996,9 @@ def check_version(self, timeout=2, strict=False):
999996
return version
1000997

1001998
def __str__(self):
1002-
return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
999+
return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % (
10031000
self.node_id, self.host, self.port, self.state,
1004-
self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
1001+
AFI_NAMES[self._sock_afi], self._sock_addr)
10051002

10061003

10071004
class BrokerConnectionMetrics(object):

test/test_conn.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -258,33 +258,31 @@ def test_lookup_on_connect():
258258
assert conn.host == hostname
259259
assert conn.port == port
260260
assert conn.afi == socket.AF_UNSPEC
261-
ip1 = '127.0.0.1'
262261
afi1 = socket.AF_INET
262+
sockaddr1 = ('127.0.0.1', 9092)
263263
mock_return1 = [
264-
(afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
264+
(afi1, socket.SOCK_STREAM, 6, '', sockaddr1),
265265
]
266266
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
267267
conn.connect()
268268
m.assert_called_once_with(hostname, port, 0, 1)
269-
conn.close()
270-
assert conn._sock_ip == ip1
271-
assert conn._sock_port == 9092
272269
assert conn._sock_afi == afi1
270+
assert conn._sock_addr == sockaddr1
271+
conn.close()
273272

274-
ip2 = '::1'
275273
afi2 = socket.AF_INET6
274+
sockaddr2 = ('::1', 9092, 0, 0)
276275
mock_return2 = [
277-
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
276+
(afi2, socket.SOCK_STREAM, 6, '', sockaddr2),
278277
]
279278

280279
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
281280
conn.last_attempt = 0
282281
conn.connect()
283282
m.assert_called_once_with(hostname, port, 0, 1)
284-
conn.close()
285-
assert conn._sock_ip == ip2
286-
assert conn._sock_port == 9092
287283
assert conn._sock_afi == afi2
284+
assert conn._sock_addr == sockaddr2
285+
conn.close()
288286

289287

290288
def test_relookup_on_failure():
@@ -300,17 +298,16 @@ def test_relookup_on_failure():
300298
assert conn.disconnected()
301299
assert conn.last_attempt > last_attempt
302300

303-
ip2 = '127.0.0.2'
304301
afi2 = socket.AF_INET
302+
sockaddr2 = ('127.0.0.2', 9092)
305303
mock_return2 = [
306-
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
304+
(afi2, socket.SOCK_STREAM, 6, '', sockaddr2),
307305
]
308306

309307
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
310308
conn.last_attempt = 0
311309
conn.connect()
312310
m.assert_called_once_with(hostname, port, 0, 1)
313-
conn.close()
314-
assert conn._sock_ip == ip2
315-
assert conn._sock_port == 9092
316311
assert conn._sock_afi == afi2
312+
assert conn._sock_addr == sockaddr2
313+
conn.close()

0 commit comments

Comments
 (0)