55
66from mock import MagicMock , patch
77
8-
98from kafka import KafkaClient
109from kafka .common import (
1110 ProduceRequest , FetchRequest , Message , ChecksumError ,
1211 ConsumerFetchSizeTooSmall , ProduceResponse , FetchResponse ,
13- OffsetAndMessage , BrokerMetadata , PartitionMetadata
12+ OffsetAndMessage , BrokerMetadata , PartitionMetadata ,
13+ TopicAndPartition , KafkaUnavailableError ,
14+ LeaderUnavailableError , PartitionUnavailableError
1415)
15- from kafka .common import KafkaUnavailableError
1616from kafka .codec import (
1717 has_gzip , has_snappy , gzip_encode , gzip_decode ,
1818 snappy_encode , snappy_decode
@@ -410,6 +410,7 @@ def test_encode_offset_request(self):
410410 def test_decode_offset_response (self ):
411411 pass
412412
413+
413414 @unittest .skip ("Not Implemented" )
414415 def test_encode_offset_commit_request (self ):
415416 pass
@@ -474,18 +475,17 @@ def mock_get_conn(host, port):
474475 return mocked_conns [(host , port )]
475476
476477 # patch to avoid making requests before we want it
477- with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
478- patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
479-
480- client = KafkaClient (hosts = ['kafka01:9092' , 'kafka02:9092' ])
478+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
479+ with patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
480+ client = KafkaClient (hosts = ['kafka01:9092' , 'kafka02:9092' ])
481481
482- self .assertRaises (
483- KafkaUnavailableError ,
484- client ._send_broker_unaware_request ,
485- 1 , 'fake request' )
482+ self .assertRaises (
483+ KafkaUnavailableError ,
484+ client ._send_broker_unaware_request ,
485+ 1 , 'fake request' )
486486
487- for key , conn in mocked_conns .iteritems ():
488- conn .send .assert_called_with (1 , 'fake request' )
487+ for key , conn in mocked_conns .iteritems ():
488+ conn .send .assert_called_with (1 , 'fake request' )
489489
490490 def test_send_broker_unaware_request (self ):
491491 'Tests that call works when at least one of the host is available'
@@ -504,16 +504,171 @@ def mock_get_conn(host, port):
504504 return mocked_conns [(host , port )]
505505
506506 # patch to avoid making requests before we want it
507- with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
508- patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
507+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
508+ with patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
509+ client = KafkaClient (hosts = 'kafka01:9092,kafka02:9092' )
510+
511+ resp = client ._send_broker_unaware_request (1 , 'fake request' )
512+
513+ self .assertEqual ('valid response' , resp )
514+ mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
515+
516+ @patch ('kafka.client.KafkaConnection' )
517+ @patch ('kafka.client.KafkaProtocol' )
518+ def test_load_metadata (self , protocol , conn ):
519+ "Load metadata for all topics"
520+
521+ conn .recv .return_value = 'response' # anything but None
522+
523+ brokers = {}
524+ brokers [0 ] = BrokerMetadata (1 , 'broker_1' , 4567 )
525+ brokers [1 ] = BrokerMetadata (2 , 'broker_2' , 5678 )
526+
527+ topics = {}
528+ topics ['topic_1' ] = {
529+ 0 : PartitionMetadata ('topic_1' , 0 , 1 , [1 , 2 ], [1 , 2 ])
530+ }
531+ topics ['topic_noleader' ] = {
532+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
533+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
534+ }
535+ topics ['topic_no_partitions' ] = {}
536+ topics ['topic_3' ] = {
537+ 0 : PartitionMetadata ('topic_3' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
538+ 1 : PartitionMetadata ('topic_3' , 1 , 1 , [1 , 0 ], [1 , 0 ]),
539+ 2 : PartitionMetadata ('topic_3' , 2 , 0 , [0 , 1 ], [0 , 1 ])
540+ }
541+ protocol .decode_metadata_response .return_value = (brokers , topics )
542+
543+ # client loads metadata at init
544+ client = KafkaClient (hosts = ['broker_1:4567' ])
545+ self .assertDictEqual ({
546+ TopicAndPartition ('topic_1' , 0 ): brokers [1 ],
547+ TopicAndPartition ('topic_noleader' , 0 ): None ,
548+ TopicAndPartition ('topic_noleader' , 1 ): None ,
549+ TopicAndPartition ('topic_3' , 0 ): brokers [0 ],
550+ TopicAndPartition ('topic_3' , 1 ): brokers [1 ],
551+ TopicAndPartition ('topic_3' , 2 ): brokers [0 ]},
552+ client .topics_to_brokers )
553+
554+ @patch ('kafka.client.KafkaConnection' )
555+ @patch ('kafka.client.KafkaProtocol' )
556+ def test_get_leader_for_partitions_reloads_metadata (self , protocol , conn ):
557+ "Get leader for partitions reload metadata if it is not available"
558+
559+ conn .recv .return_value = 'response' # anything but None
560+
561+ brokers = {}
562+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
563+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
564+
565+ topics = {'topic_no_partitions' : {}}
566+ protocol .decode_metadata_response .return_value = (brokers , topics )
567+
568+ client = KafkaClient (hosts = ['broker_1:4567' ])
569+
570+ # topic metadata is loaded but empty
571+ self .assertDictEqual ({}, client .topics_to_brokers )
572+
573+ topics ['topic_no_partitions' ] = {
574+ 0 : PartitionMetadata ('topic_no_partitions' , 0 , 0 , [0 , 1 ], [0 , 1 ])
575+ }
576+ protocol .decode_metadata_response .return_value = (brokers , topics )
577+
578+ # calling _get_leader_for_partition (from any broker aware request)
579+ # will try loading metadata again for the same topic
580+ leader = client ._get_leader_for_partition ('topic_no_partitions' , 0 )
581+
582+ self .assertEqual (brokers [0 ], leader )
583+ self .assertDictEqual ({
584+ TopicAndPartition ('topic_no_partitions' , 0 ): brokers [0 ]},
585+ client .topics_to_brokers )
586+
587+ @patch ('kafka.client.KafkaConnection' )
588+ @patch ('kafka.client.KafkaProtocol' )
589+ def test_get_leader_for_unassigned_partitions (self , protocol , conn ):
590+ "Get leader raises if no partitions is defined for a topic"
591+
592+ conn .recv .return_value = 'response' # anything but None
593+
594+ brokers = {}
595+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
596+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
597+
598+ topics = {'topic_no_partitions' : {}}
599+ protocol .decode_metadata_response .return_value = (brokers , topics )
600+
601+ client = KafkaClient (hosts = ['broker_1:4567' ])
602+
603+ self .assertDictEqual ({}, client .topics_to_brokers )
604+ self .assertRaises (
605+ PartitionUnavailableError ,
606+ client ._get_leader_for_partition ,
607+ 'topic_no_partitions' , 0 )
608+
609+ @patch ('kafka.client.KafkaConnection' )
610+ @patch ('kafka.client.KafkaProtocol' )
611+ def test_get_leader_returns_none_when_noleader (self , protocol , conn ):
612+ "Getting leader for partitions returns None when the partiion has no leader"
613+
614+ conn .recv .return_value = 'response' # anything but None
509615
510- client = KafkaClient (hosts = 'kafka01:9092,kafka02:9092' )
616+ brokers = {}
617+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
618+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
619+
620+ topics = {}
621+ topics ['topic_noleader' ] = {
622+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
623+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
624+ }
625+ protocol .decode_metadata_response .return_value = (brokers , topics )
626+
627+ client = KafkaClient (hosts = ['broker_1:4567' ])
628+ self .assertDictEqual (
629+ {
630+ TopicAndPartition ('topic_noleader' , 0 ): None ,
631+ TopicAndPartition ('topic_noleader' , 1 ): None
632+ },
633+ client .topics_to_brokers )
634+ self .assertIsNone (client ._get_leader_for_partition ('topic_noleader' , 0 ))
635+ self .assertIsNone (client ._get_leader_for_partition ('topic_noleader' , 1 ))
636+
637+ topics ['topic_noleader' ] = {
638+ 0 : PartitionMetadata ('topic_noleader' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
639+ 1 : PartitionMetadata ('topic_noleader' , 1 , 1 , [1 , 0 ], [1 , 0 ])
640+ }
641+ protocol .decode_metadata_response .return_value = (brokers , topics )
642+ self .assertEqual (brokers [0 ], client ._get_leader_for_partition ('topic_noleader' , 0 ))
643+ self .assertEqual (brokers [1 ], client ._get_leader_for_partition ('topic_noleader' , 1 ))
644+
645+ @patch ('kafka.client.KafkaConnection' )
646+ @patch ('kafka.client.KafkaProtocol' )
647+ def test_send_produce_request_raises_when_noleader (self , protocol , conn ):
648+ "Send producer request raises LeaderUnavailableError if leader is not available"
649+
650+ conn .recv .return_value = 'response' # anything but None
651+
652+ brokers = {}
653+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
654+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
655+
656+ topics = {}
657+ topics ['topic_noleader' ] = {
658+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
659+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
660+ }
661+ protocol .decode_metadata_response .return_value = (brokers , topics )
511662
512- resp = client . _send_broker_unaware_request ( 1 , 'fake request' )
663+ client = KafkaClient ( hosts = [ 'broker_1:4567' ] )
513664
514- self .assertEqual ('valid response' , resp )
515- mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
665+ requests = [ProduceRequest (
666+ "topic_noleader" , 0 ,
667+ [create_message ("a" ), create_message ("b" )])]
516668
669+ self .assertRaises (
670+ LeaderUnavailableError ,
671+ client .send_produce_request , requests )
517672
518673if __name__ == '__main__' :
519674 unittest .main ()
0 commit comments