33import struct
44import unittest
55
6+ from mock import MagicMock , patch
7+
8+
9+ from kafka import KafkaClient
610from kafka .common import (
711 ProduceRequest , FetchRequest , Message , ChecksumError ,
812 ConsumerFetchSizeTooSmall , ProduceResponse , FetchResponse ,
913 OffsetAndMessage , BrokerMetadata , PartitionMetadata
1014)
15+ from kafka .common import KafkaUnavailableError
1116from kafka .codec import (
1217 has_gzip , has_snappy , gzip_encode , gzip_decode ,
1318 snappy_encode , snappy_decode
@@ -405,7 +410,6 @@ def test_encode_offset_request(self):
405410 def test_decode_offset_response (self ):
406411 pass
407412
408-
409413 @unittest .skip ("Not Implemented" )
410414 def test_encode_offset_commit_request (self ):
411415 pass
@@ -423,5 +427,83 @@ def test_decode_offset_fetch_response(self):
423427 pass
424428
425429
430+ class TestKafkaClient (unittest .TestCase ):
431+
432+ def test_init_with_list (self ):
433+
434+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
435+ client = KafkaClient (
436+ hosts = ['kafka01:9092' , 'kafka02:9092' , 'kafka03:9092' ])
437+
438+ self .assertItemsEqual (
439+ [('kafka01' , 9092 ), ('kafka02' , 9092 ), ('kafka03' , 9092 )],
440+ client .hosts )
441+
442+ def test_init_with_csv (self ):
443+
444+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
445+ client = KafkaClient (
446+ hosts = 'kafka01:9092,kafka02:9092,kafka03:9092' )
447+
448+ self .assertItemsEqual (
449+ [('kafka01' , 9092 ), ('kafka02' , 9092 ), ('kafka03' , 9092 )],
450+ client .hosts )
451+
452+ def test_send_broker_unaware_request_fail (self ):
453+ 'Tests that call fails when all hosts are unavailable'
454+
455+ mocked_conns = {
456+ ('kafka01' , 9092 ): MagicMock (),
457+ ('kafka02' , 9092 ): MagicMock ()
458+ }
459+ # inject KafkaConnection side effects
460+ mocked_conns [('kafka01' , 9092 )].send .side_effect = RuntimeError ("kafka01 went away (unittest)" )
461+ mocked_conns [('kafka02' , 9092 )].send .side_effect = RuntimeError ("Kafka02 went away (unittest)" )
462+
463+ def mock_get_conn (host , port ):
464+ return mocked_conns [(host , port )]
465+
466+ # patch to avoid making requests before we want it
467+ with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
468+ patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
469+
470+ client = KafkaClient (hosts = ['kafka01:9092' , 'kafka02:9092' ])
471+
472+ self .assertRaises (
473+ KafkaUnavailableError ,
474+ client ._send_broker_unaware_request ,
475+ 1 , 'fake request' )
476+
477+ for key , conn in mocked_conns .iteritems ():
478+ conn .send .assert_called_with (1 , 'fake request' )
479+
480+ def test_send_broker_unaware_request (self ):
481+ 'Tests that call works when at least one of the host is available'
482+
483+ mocked_conns = {
484+ ('kafka01' , 9092 ): MagicMock (),
485+ ('kafka02' , 9092 ): MagicMock (),
486+ ('kafka03' , 9092 ): MagicMock ()
487+ }
488+ # inject KafkaConnection side effects
489+ mocked_conns [('kafka01' , 9092 )].send .side_effect = RuntimeError ("kafka01 went away (unittest)" )
490+ mocked_conns [('kafka02' , 9092 )].recv .return_value = 'valid response'
491+ mocked_conns [('kafka03' , 9092 )].send .side_effect = RuntimeError ("kafka03 went away (unittest)" )
492+
493+ def mock_get_conn (host , port ):
494+ return mocked_conns [(host , port )]
495+
496+ # patch to avoid making requests before we want it
497+ with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
498+ patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
499+
500+ client = KafkaClient (hosts = 'kafka01:9092,kafka02:9092' )
501+
502+ resp = client ._send_broker_unaware_request (1 , 'fake request' )
503+
504+ self .assertEqual ('valid response' , resp )
505+ mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
506+
507+
426508if __name__ == '__main__' :
427509 unittest .main ()
0 commit comments