@@ -770,6 +770,113 @@ def test_large_messages(self):
770
770
self .assertEquals (all_messages [i ], message .message )
771
771
self .assertEquals (i , 19 )
772
772
773
+ class TestFailover (unittest .TestCase ):
774
+
775
+ @classmethod
776
+ def setUpClass (cls ):
777
+
778
+ zk_chroot = random_string (10 )
779
+ replicas = 2
780
+ partitions = 2
781
+
782
+ # mini zookeeper, 2 kafka brokers
783
+ cls .zk = ZookeeperFixture .instance ()
784
+ kk_args = [cls .zk .host , cls .zk .port , zk_chroot , replicas , partitions ]
785
+ cls .brokers = [KafkaFixture .instance (i , * kk_args ) for i in range (replicas )]
786
+ cls .client = KafkaClient (cls .brokers [0 ].host , cls .brokers [0 ].port )
787
+
788
+ @classmethod
789
+ def tearDownClass (cls ):
790
+ cls .client .close ()
791
+ for broker in cls .brokers :
792
+ broker .close ()
793
+ cls .zk .close ()
794
+
795
+ def test_switch_leader (self ):
796
+
797
+ key , topic , partition = random_string (5 ), 'test_switch_leader' , 0
798
+ producer = SimpleProducer (self .client , topic )
799
+
800
+ for i in range (1 , 4 ):
801
+
802
+ # XXX unfortunately, the conns dict needs to be warmed for this to work
803
+ # XXX unfortunately, for warming to work, we need at least as many partitions as brokers
804
+ self ._send_random_messages (producer , 10 )
805
+
806
+ # kil leader for partition 0
807
+ broker = self ._kill_leader (topic , partition )
808
+
809
+ # expect failure, reload meta data
810
+ with self .assertRaises (FailedPayloadsException ):
811
+ producer .send_messages ('part 1' )
812
+ producer .send_messages ('part 2' )
813
+ time .sleep (1 )
814
+
815
+ # send to new leader
816
+ self ._send_random_messages (producer , 10 )
817
+
818
+ broker .open ()
819
+ time .sleep (3 )
820
+
821
+ # count number of messages
822
+ count = self ._count_messages ('test_switch_leader group %s' % i , topic )
823
+ self .assertIn (count , range (20 * i , 22 * i + 1 ))
824
+
825
+ producer .stop ()
826
+
827
+ def test_switch_leader_async (self ):
828
+
829
+ key , topic , partition = random_string (5 ), 'test_switch_leader_async' , 0
830
+ producer = SimpleProducer (self .client , topic , async = True )
831
+
832
+ for i in range (1 , 4 ):
833
+
834
+ self ._send_random_messages (producer , 10 )
835
+
836
+ # kil leader for partition 0
837
+ broker = self ._kill_leader (topic , partition )
838
+
839
+ # expect failure, reload meta data
840
+ producer .send_messages ('part 1' )
841
+ producer .send_messages ('part 2' )
842
+ time .sleep (1 )
843
+
844
+ # send to new leader
845
+ self ._send_random_messages (producer , 10 )
846
+
847
+ broker .open ()
848
+ time .sleep (3 )
849
+
850
+ # count number of messages
851
+ count = self ._count_messages ('test_switch_leader_async group %s' % i , topic )
852
+ self .assertIn (count , range (20 * i , 22 * i + 1 ))
853
+
854
+ producer .stop ()
855
+
856
+ def _send_random_messages (self , producer , n ):
857
+ for j in range (n ):
858
+ resp = producer .send_messages (random_string (10 ))
859
+ if len (resp ) > 0 :
860
+ self .assertEquals (resp [0 ].error , 0 )
861
+ time .sleep (1 ) # give it some time
862
+
863
+ def _kill_leader (self , topic , partition ):
864
+ leader = self .client .topics_to_brokers [TopicAndPartition (topic , partition )]
865
+ broker = self .brokers [leader .nodeId ]
866
+ broker .close ()
867
+ time .sleep (1 ) # give it some time
868
+ return broker
869
+
870
+ def _count_messages (self , group , topic ):
871
+ client = KafkaClient (self .brokers [0 ].host , self .brokers [0 ].port )
872
+ consumer = SimpleConsumer (client , group , topic , auto_commit = False )
873
+ all_messages = []
874
+ for message in consumer :
875
+ all_messages .append (message )
876
+ consumer .stop ()
877
+ client .close ()
878
+ return len (all_messages )
879
+
773
880
774
881
def random_string (l ):
775
882
s = "" .join (random .choice (string .letters ) for i in xrange (l ))
0 commit comments