33import struct
44import unittest
55
6- from mock import patch
6+ from mock import MagicMock , patch
7+
78
89from kafka import KafkaClient
910from kafka .common import (
@@ -366,7 +367,6 @@ def test_encode_offset_request(self):
366367 def test_decode_offset_response (self ):
367368 pass
368369
369-
370370 @unittest .skip ("Not Implemented" )
371371 def test_encode_offset_commit_request (self ):
372372 pass
@@ -409,26 +409,22 @@ def test_init_with_csv(self):
409409 def test_send_broker_unaware_request_fail (self ):
410410 'Tests that call fails when all hosts are unavailable'
411411
412- from mock import MagicMock
413-
414412 mocked_conns = {
415413 ('kafka01' , 9092 ): MagicMock (),
416414 ('kafka02' , 9092 ): MagicMock ()
417415 }
418- # inject conns
416+ # inject KafkaConnection side effects
419417 mocked_conns [('kafka01' , 9092 )].send .side_effect = RuntimeError ("kafka01 went away (unittest)" )
420418 mocked_conns [('kafka02' , 9092 )].send .side_effect = RuntimeError ("Kafka02 went away (unittest)" )
421419
422420 def mock_get_conn (host , port ):
423- print 'mock_get_conn: %s:%d=%s' % (host , port , mocked_conns [(host , port )])
424421 return mocked_conns [(host , port )]
425422
426423 # patch to avoid making requests before we want it
427424 with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
428425 patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
429426
430- client = KafkaClient (hosts = ['kafka01:9092' ,'kafka02:9092' ])
431-
427+ client = KafkaClient (hosts = ['kafka01:9092' , 'kafka02:9092' ])
432428
433429 self .assertRaises (
434430 KafkaUnavailableError ,
@@ -439,22 +435,19 @@ def mock_get_conn(host, port):
439435 conn .send .assert_called_with (1 , 'fake request' )
440436
441437 def test_send_broker_unaware_request (self ):
442- 'Tests that call fails when one of the host is available'
443-
444- from mock import MagicMock
438+ 'Tests that call works when at least one of the host is available'
445439
446440 mocked_conns = {
447441 ('kafka01' , 9092 ): MagicMock (),
448442 ('kafka02' , 9092 ): MagicMock (),
449443 ('kafka03' , 9092 ): MagicMock ()
450444 }
451- # inject conns
445+ # inject KafkaConnection side effects
452446 mocked_conns [('kafka01' , 9092 )].send .side_effect = RuntimeError ("kafka01 went away (unittest)" )
453447 mocked_conns [('kafka02' , 9092 )].recv .return_value = 'valid response'
454448 mocked_conns [('kafka03' , 9092 )].send .side_effect = RuntimeError ("kafka03 went away (unittest)" )
455449
456450 def mock_get_conn (host , port ):
457- print 'mock_get_conn: %s:%d=%s' % (host , port , mocked_conns [(host , port )])
458451 return mocked_conns [(host , port )]
459452
460453 # patch to avoid making requests before we want it
@@ -468,123 +461,6 @@ def mock_get_conn(host, port):
468461 self .assertEqual ('valid response' , resp )
469462 mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
470463
471- @unittest .skip ('requires disabling recursion on load_metadata_for_topics' )
472- @patch ('kafka.client.KafkaConnection' )
473- @patch ('kafka.client.KafkaProtocol' )
474- def test_client_load_metadata (self , protocol , conn ):
475-
476- conn .recv .return_value = 'response' # anything but None
477-
478- brokers = {}
479- brokers [0 ] = BrokerMetadata (1 , 'broker_1' , 4567 )
480- brokers [1 ] = BrokerMetadata (2 , 'broker_2' , 5678 )
481-
482- topics = {}
483- topics ['topic_1' ] = {
484- 0 : PartitionMetadata ('topic_1' , 0 , 1 , [1 , 2 ], [1 , 2 ])
485- }
486- topics ['topic_2' ] = {
487- 0 : PartitionMetadata ('topic_2' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
488- 1 : PartitionMetadata ('topic_2' , 1 , 1 , [1 , 0 ], [1 , 0 ])
489- }
490- protocol .decode_metadata_response .return_value = (brokers , topics )
491-
492- client = KafkaClient (hosts = 'broker_1:4567' )
493- self .assertItemsEqual (
494- {
495- TopicAndPartition ('topic_1' , 0 ): brokers [0 ],
496- TopicAndPartition ('topic_2' , 0 ): brokers [0 ],
497- TopicAndPartition ('topic_2' , 1 ): brokers [1 ]
498- },
499- client .topics_to_brokers )
500-
501- @unittest .skip ('requires disabling recursion on load_metadata_for_topics' )
502- @patch ('kafka.client.KafkaConnection' )
503- @patch ('kafka.client.KafkaProtocol' )
504- def test_client_load_metadata_unassigned_partitions (self , protocol , conn ):
505-
506- conn .recv .return_value = 'response' # anything but None
507-
508- brokers = {}
509- brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
510- brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
511-
512- topics = {}
513- topics ['topic_1' ] = {
514- 0 : PartitionMetadata ('topic_1' , 0 , - 1 , [], [])
515- }
516- protocol .decode_metadata_response .return_value = (brokers , topics )
517-
518- client = KafkaClient (hosts = 'broker_1:4567' )
519-
520- self .assertItemsEqual ({}, client .topics_to_brokers )
521- self .assertRaises (
522- Exception ,
523- client ._get_leader_for_partition ,
524- 'topic_1' , 0 )
525-
526- # calling _get_leader_for_partition (from any broker aware request)
527- # will try loading metadata again for the same topic
528- topics ['topic_1' ] = {
529- 0 : PartitionMetadata ('topic_1' , 0 , 0 , [0 , 1 ], [0 , 1 ])
530- }
531- leader = client ._get_leader_for_partition ('topic_1' , 0 )
532-
533- self .assertEqual (brokers [0 ], leader )
534- self .assertItemsEqual (
535- {
536- TopicAndPartition ('topic_1' , 0 ): brokers [0 ],
537- },
538- client .topics_to_brokers )
539-
540- @unittest .skip ('requires disabling recursion on load_metadata_for_topics' )
541- @patch ('kafka.client.KafkaConnection' )
542- @patch ('kafka.client.KafkaProtocol' )
543- def test_client_load_metadata_noleader_partitions (self , protocol , conn ):
544-
545- conn .recv .return_value = 'response' # anything but None
546-
547- brokers = {}
548- brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
549- brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
550-
551- topics = {}
552- topics ['topic_1' ] = {
553- 0 : PartitionMetadata ('topic_1' , 0 , - 1 , [], [])
554- }
555- topics ['topic_2' ] = {
556- 0 : PartitionMetadata ('topic_2' , 0 , 0 , [0 , 1 ], []),
557- 1 : PartitionMetadata ('topic_2' , 1 , 1 , [1 , 0 ], [1 , 0 ])
558- }
559- protocol .decode_metadata_response .return_value = (brokers , topics )
560-
561- client = KafkaClient (hosts = 'broker_1:4567' )
562- self .assertItemsEqual (
563- {
564- TopicAndPartition ('topic_2' , 0 ): brokers [0 ],
565- TopicAndPartition ('topic_2' , 1 ): brokers [1 ]
566- },
567- client .topics_to_brokers )
568- self .assertRaises (
569- Exception ,
570- client ._get_leader_for_partition ,
571- 'topic_1' , 0 )
572-
573- # calling _get_leader_for_partition (from any broker aware request)
574- # will try loading metadata again for the same topic
575- topics ['topic_1' ] = {
576- 0 : PartitionMetadata ('topic_1' , 0 , 0 , [0 , 1 ], [0 , 1 ])
577- }
578- leader = client ._get_leader_for_partition ('topic_1' , 0 )
579-
580- self .assertEqual (brokers [0 ], leader )
581- self .assertItemsEqual (
582- {
583- TopicAndPartition ('topic_1' , 0 ): brokers [0 ],
584- TopicAndPartition ('topic_2' , 0 ): brokers [0 ],
585- TopicAndPartition ('topic_2' , 1 ): brokers [1 ]
586- },
587- client .topics_to_brokers )
588464
589465if __name__ == '__main__' :
590466 unittest .main ()
0 commit comments