|
7 | 7 | from kafka import KafkaClient, SimpleConsumer |
8 | 8 | from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError |
9 | 9 | from kafka.producer.base import Producer |
| 10 | +from kafka.producer import KeyedProducer |
10 | 11 |
|
11 | 12 | from test.fixtures import ZookeeperFixture, KafkaFixture |
12 | 13 | from test.testutil import ( |
@@ -116,6 +117,45 @@ def test_switch_leader_async(self): |
116 | 117 | # Should be equal to 10 before + 1 recovery + 10 after |
117 | 118 | self.assert_message_count(topic, 21, partitions=(partition,)) |
118 | 119 |
|
| 120 | + @kafka_versions("all") |
| 121 | + def test_switch_leader_keyed_producer(self): |
| 122 | + topic = self.topic |
| 123 | + |
| 124 | + producer = KeyedProducer(self.client, async=False) |
| 125 | + |
| 126 | + # Send 10 random messages |
| 127 | + for _ in range(10): |
| 128 | + key = random_string(3) |
| 129 | + msg = random_string(10) |
| 130 | + producer.send_messages(topic, key, msg) |
| 131 | + |
| 132 | + # kill leader for partition 0 |
| 133 | + self._kill_leader(topic, 0) |
| 134 | + |
| 135 | + recovered = False |
| 136 | + started = time.time() |
| 137 | + timeout = 60 |
| 138 | + while not recovered and (time.time() - started) < timeout: |
| 139 | + try: |
| 140 | + key = random_string(3) |
| 141 | + msg = random_string(10) |
| 142 | + producer.send_messages(topic, key, msg) |
| 143 | + if producer.partitioners[topic].partition(key) == 0: |
| 144 | + recovered = True |
| 145 | + except (FailedPayloadsError, ConnectionError): |
| 146 | + logging.debug("caught exception sending message -- will retry") |
| 147 | + continue |
| 148 | + |
| 149 | + # Verify we successfully sent the message |
| 150 | + self.assertTrue(recovered) |
| 151 | + |
| 152 | + # send some more messages just to make sure no more exceptions |
| 153 | + for _ in range(10): |
| 154 | + key = random_string(3) |
| 155 | + msg = random_string(10) |
| 156 | + producer.send_messages(topic, key, msg) |
| 157 | + |
| 158 | + |
119 | 159 | def _send_random_messages(self, producer, topic, partition, n): |
120 | 160 | for j in range(n): |
121 | 161 | logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) |
|
0 commit comments