@@ -26,7 +26,7 @@ import scala.concurrent.duration._
2626import scala .language .postfixOps
2727import scala .util .Random
2828
29- import kafka .admin .CreateTopicCommand
29+ import kafka .admin .AdminUtils
3030import kafka .common .{KafkaException , TopicAndPartition }
3131import kafka .producer .{KeyedMessage , Producer , ProducerConfig }
3232import kafka .serializer .{StringDecoder , StringEncoder }
@@ -77,8 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
7777 var bindSuccess : Boolean = false
7878 while (! bindSuccess) {
7979 try {
80- val brokerProps = getBrokerConfig()
81- brokerConf = new KafkaConfig (brokerProps)
80+ brokerConf = new KafkaConfig (brokerConfig)
8281 server = new KafkaServer (brokerConf)
8382 logInfo(" ==================== 2 ====================" )
8483 server.startup()
@@ -123,27 +122,26 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
123122
124123 private def createTestMessage (topic : String , sent : Map [String , Int ])
125124 : Seq [KeyedMessage [String , String ]] = {
126- val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
125+ ( for ((s, freq) <- sent; i <- 0 until freq) yield {
127126 new KeyedMessage [String , String ](topic, s)
128- }
129- messages.toSeq
127+ }).toSeq
130128 }
131129
132130 def createTopic (topic : String ) {
133- CreateTopicCommand .createTopic(zkClient, topic, 1 , 1 , " 0 " )
131+ AdminUtils .createTopic(zkClient, topic, 1 , 1 )
134132 logInfo(" ==================== 5 ====================" )
135133 // wait until metadata is propagated
136- waitUntilMetadataIsPropagated(topic, 0 )
134+ waitUntilMetadataIsPropagated(Seq (server), topic, 0 )
137135 }
138136
139137 def produceAndSendMessage (topic : String , sent : Map [String , Int ]) {
140- producer = new Producer [String , String ](new ProducerConfig (getProducerConfig() ))
138+ producer = new Producer [String , String ](new ProducerConfig (producerConfig ))
141139 producer.send(createTestMessage(topic, sent): _* )
142140 producer.close()
143141 logInfo(" ==================== 6 ====================" )
144142 }
145143
146- private def getBrokerConfig () : Properties = {
144+ private def brokerConfig : Properties = {
147145 val props = new Properties ()
148146 props.put(" broker.id" , " 0" )
149147 props.put(" host.name" , " localhost" )
@@ -155,21 +153,29 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
155153 props
156154 }
157155
158- private def getProducerConfig () : Properties = {
156+ private def producerConfig : Properties = {
159157 val brokerAddr = brokerConf.hostName + " :" + brokerConf.port
160158 val props = new Properties ()
161159 props.put(" metadata.broker.list" , brokerAddr)
162160 props.put(" serializer.class" , classOf [StringEncoder ].getName)
163161 props
164162 }
165163
166- private def waitUntilMetadataIsPropagated (topic : String , partition : Int ) {
164+ private def waitUntilMetadataIsPropagated (servers : Seq [KafkaServer ], topic : String , partition : Int ): Int = {
165+ var leader : Int = - 1
167166 eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
168- assert(
169- server.apis.leaderCache.keySet.contains(TopicAndPartition (topic, partition)),
170- s " Partition [ $topic, $partition] metadata not propagated after timeout "
171- )
167+ assert(servers.foldLeft(true ) {
168+ (result, server) =>
169+ val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
170+ partitionStateOpt match {
171+ case None => false
172+ case Some (partitionState) =>
173+ leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader
174+ result && leader >= 0 // is valid broker id
175+ }
176+ }, s " Partition [ $topic, $partition] metadata not propagated after timeout " )
172177 }
178+ leader
173179 }
174180
175181 class EmbeddedZookeeper (val zkConnect : String ) {
0 commit comments