55
66from mock import patch
77
8+ from kafka import KafkaClient
89from kafka .common import (
910 ProduceRequest , FetchRequest , Message , ChecksumError ,
1011 ConsumerFetchSizeTooSmall , ProduceResponse , FetchResponse ,
1112 OffsetAndMessage , BrokerMetadata , PartitionMetadata
1213)
14+ from kafka .common import KafkaUnavailableError
1315from kafka .codec import (
1416 has_gzip , has_snappy , gzip_encode , gzip_decode ,
1517 snappy_encode , snappy_decode
@@ -384,6 +386,26 @@ def test_decode_offset_fetch_response(self):
384386
385387class TestKafkaClient (unittest .TestCase ):
386388
389+ def test_init_with_list (self ):
390+
391+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
392+ client = KafkaClient (
393+ hosts = ['kafka01:9092' , 'kafka02:9092' , 'kafka03:9092' ])
394+
395+ self .assertItemsEqual (
396+ [('kafka01' , 9092 ), ('kafka02' , 9092 ), ('kafka03' , 9092 )],
397+ client .hosts )
398+
399+ def test_init_with_csv (self ):
400+
401+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
402+ client = KafkaClient (
403+ hosts = 'kafka01:9092,kafka02:9092,kafka03:9092' )
404+
405+ self .assertItemsEqual (
406+ [('kafka01' , 9092 ), ('kafka02' , 9092 ), ('kafka03' , 9092 )],
407+ client .hosts )
408+
387409 def test_send_broker_unaware_request_fail (self ):
388410 'Tests that call fails when all hosts are unavailable'
389411
@@ -402,14 +424,16 @@ def mock_get_conn(host, port):
402424 return mocked_conns [(host , port )]
403425
404426 # patch to avoid making requests before we want it
405- with patch .object (KafkaClient , '_load_metadata_for_topics ' ), \
427+ with patch .object (KafkaClient , 'load_metadata_for_topics ' ), \
406428 patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
407429
408- client = KafkaClient (hosts = 'kafka01:9092, kafka02:9092' )
430+ client = KafkaClient (hosts = [ 'kafka01:9092' , ' kafka02:9092'] )
409431
410- resp = client ._send_broker_unaware_request (1 , 'fake request' )
411432
412- self .assertIsNone (resp )
433+ self .assertRaises (
434+ KafkaUnavailableError ,
435+ client ._send_broker_unaware_request ,
436+ 1 , 'fake request' )
413437
414438 for key , conn in mocked_conns .iteritems ():
415439 conn .send .assert_called_with (1 , 'fake request' )
@@ -434,7 +458,7 @@ def mock_get_conn(host, port):
434458 return mocked_conns [(host , port )]
435459
436460 # patch to avoid making requests before we want it
437- with patch .object (KafkaClient , '_load_metadata_for_topics ' ), \
461+ with patch .object (KafkaClient , 'load_metadata_for_topics ' ), \
438462 patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
439463
440464 client = KafkaClient (hosts = 'kafka01:9092,kafka02:9092' )
@@ -444,7 +468,7 @@ def mock_get_conn(host, port):
444468 self .assertEqual ('valid response' , resp )
445469 mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
446470
447- @unittest .skip ('requires disabling recursion on _load_metadata_for_topics ' )
471+ @unittest .skip ('requires disabling recursion on load_metadata_for_topics ' )
448472 @patch ('kafka.client.KafkaConnection' )
449473 @patch ('kafka.client.KafkaProtocol' )
450474 def test_client_load_metadata (self , protocol , conn ):
@@ -474,7 +498,7 @@ def test_client_load_metadata(self, protocol, conn):
474498 },
475499 client .topics_to_brokers )
476500
477- @unittest .skip ('requires disabling recursion on _load_metadata_for_topics ' )
501+ @unittest .skip ('requires disabling recursion on load_metadata_for_topics ' )
478502 @patch ('kafka.client.KafkaConnection' )
479503 @patch ('kafka.client.KafkaProtocol' )
480504 def test_client_load_metadata_unassigned_partitions (self , protocol , conn ):
@@ -513,7 +537,7 @@ def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
513537 },
514538 client .topics_to_brokers )
515539
516- @unittest .skip ('requires disabling recursion on _load_metadata_for_topics ' )
540+ @unittest .skip ('requires disabling recursion on load_metadata_for_topics ' )
517541 @patch ('kafka.client.KafkaConnection' )
518542 @patch ('kafka.client.KafkaProtocol' )
519543 def test_client_load_metadata_noleader_partitions (self , protocol , conn ):
0 commit comments