|
12 | 12 | from .fixtures import ZookeeperFixture, KafkaFixture |
13 | 13 |
|
14 | 14 |
|
| 15 | +def ensure_topic_creation(client, topic_name): |
| 16 | + times = 0 |
| 17 | + while True: |
| 18 | + times += 1 |
| 19 | + client.load_metadata_for_topics(topic_name) |
| 20 | + if client.has_metadata_for_topic(topic_name): |
| 21 | + break |
| 22 | + print "Waiting for %s topic to be created" % topic_name |
| 23 | + time.sleep(1) |
| 24 | + |
| 25 | + if times > 30: |
| 26 | + raise Exception("Unable to create topic %s" % topic_name) |
| 27 | + |
| 28 | + |
15 | 29 | class KafkaTestCase(unittest.TestCase): |
16 | 30 | def setUp(self): |
17 | 31 | topic_name = self.id()[self.id().rindex(".")+1:] |
18 | | - times = 0 |
19 | | - while True: |
20 | | - times += 1 |
21 | | - self.client.load_metadata_for_topics(topic_name) |
22 | | - if self.client.has_metadata_for_topic(topic_name): |
23 | | - break |
24 | | - print "Waiting for %s topic to be created" % topic_name |
25 | | - time.sleep(1) |
26 | | - |
27 | | - if times > 30: |
28 | | - raise Exception("Unable to create topic %s" % topic_name) |
| 32 | + ensure_topic_creation(self.client, topic_name) |
29 | 33 |
|
30 | 34 |
|
31 | 35 | class TestKafkaClient(KafkaTestCase): |
@@ -719,7 +723,7 @@ def test_multi_process_consumer(self): |
719 | 723 | start = datetime.now() |
720 | 724 | messages = consumer.get_messages(block=True, timeout=5) |
721 | 725 | diff = (datetime.now() - start).total_seconds() |
722 | | - self.assertGreaterEqual(diff, 4.9) |
| 726 | + self.assertGreaterEqual(diff, 4.999) |
723 | 727 | self.assertEqual(len(messages), 0) |
724 | 728 |
|
725 | 729 | # Send 10 messages |
@@ -830,7 +834,7 @@ def setUp(self): |
830 | 834 | kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] |
831 | 835 | self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] |
832 | 836 | self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) |
833 | | - KafkaTestCase.setUp(self) |
| 837 | + super(TestFailover, self).setUp() |
834 | 838 |
|
835 | 839 | def tearDown(self): |
836 | 840 | self.client.close() |
|
0 commit comments