6
6
import logging
7
7
import random
8
8
import threading
9
+ import weakref
9
10
10
11
# selectors in stdlib as of py3.4
11
12
try :
27
28
from .metrics .stats import Avg , Count , Rate
28
29
from .metrics .stats .rate import TimeUnit
29
30
from .protocol .metadata import MetadataRequest
31
+ from .util import Dict , WeakMethod
30
32
# Although this looks unused, it actually monkey-patches socket.socketpair()
31
33
# and should be left in as long as we're using socket.socketpair() in this file
32
34
from .vendor import socketpair
@@ -197,7 +199,7 @@ def __init__(self, **configs):
197
199
self ._topics = set () # empty set will fetch all topic metadata
198
200
self ._metadata_refresh_in_progress = False
199
201
self ._selector = self .config ['selector' ]()
200
- self ._conns = {}
202
+ self ._conns = Dict () # object to support weakrefs
201
203
self ._connecting = set ()
202
204
self ._refresh_on_disconnects = True
203
205
self ._last_bootstrap = 0
@@ -220,7 +222,7 @@ def __init__(self, **configs):
220
222
if self .config ['metrics' ]:
221
223
self ._sensors = KafkaClientMetrics (self .config ['metrics' ],
222
224
self .config ['metric_group_prefix' ],
223
- self ._conns )
225
+ weakref . proxy ( self ._conns ) )
224
226
225
227
self ._bootstrap (collect_hosts (self .config ['bootstrap_servers' ]))
226
228
@@ -248,7 +250,7 @@ def _bootstrap(self, hosts):
248
250
249
251
for host , port , afi in hosts :
250
252
log .debug ("Attempting to bootstrap via node at %s:%s" , host , port )
251
- cb = functools .partial (self ._conn_state_change , 'bootstrap' )
253
+ cb = functools .partial (WeakMethod ( self ._conn_state_change ) , 'bootstrap' )
252
254
bootstrap = BrokerConnection (host , port , afi ,
253
255
state_change_callback = cb ,
254
256
node_id = 'bootstrap' ,
@@ -357,7 +359,7 @@ def _maybe_connect(self, node_id):
357
359
log .debug ("Initiating connection to node %s at %s:%s" ,
358
360
node_id , broker .host , broker .port )
359
361
host , port , afi = get_ip_port_afi (broker .host )
360
- cb = functools .partial (self ._conn_state_change , node_id )
362
+ cb = functools .partial (WeakMethod ( self ._conn_state_change ) , node_id )
361
363
conn = BrokerConnection (host , broker .port , afi ,
362
364
state_change_callback = cb ,
363
365
node_id = node_id ,
@@ -404,6 +406,13 @@ def connected(self, node_id):
404
406
return False
405
407
return self ._conns [node_id ].connected ()
406
408
409
+ def _close (self ):
410
+ if not self ._closed :
411
+ self ._closed = True
412
+ self ._wake_r .close ()
413
+ self ._wake_w .close ()
414
+ self ._selector .close ()
415
+
407
416
def close (self , node_id = None ):
408
417
"""Close one or all broker connections.
409
418
@@ -412,18 +421,18 @@ def close(self, node_id=None):
412
421
"""
413
422
with self ._lock :
414
423
if node_id is None :
415
- self ._closed = True
424
+ self ._close ()
416
425
for conn in self ._conns .values ():
417
426
conn .close ()
418
- self ._wake_r .close ()
419
- self ._wake_w .close ()
420
- self ._selector .close ()
421
427
elif node_id in self ._conns :
422
428
self ._conns [node_id ].close ()
423
429
else :
424
430
log .warning ("Node %s not found in current connection list; skipping" , node_id )
425
431
return
426
432
433
+ def __del__ (self ):
434
+ self ._close ()
435
+
427
436
def is_disconnected (self , node_id ):
428
437
"""Check whether the node connection has been disconnected or failed.
429
438
0 commit comments