@@ -26,7 +26,7 @@ import scala.concurrent.duration._
2626import scala .language .postfixOps
2727import scala .util .Random
2828
29- import kafka .admin .CreateTopicCommand
29+ import kafka .admin .AdminUtils
3030import kafka .common .{KafkaException , TopicAndPartition }
3131import kafka .producer .{KeyedMessage , Producer , ProducerConfig }
3232import kafka .serializer .{StringDecoder , StringEncoder }
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
130130 }
131131
132132 def createTopic (topic : String ) {
133- CreateTopicCommand .createTopic(zkClient, topic, 1 , 1 , " 0 " )
133+ AdminUtils .createTopic(zkClient, topic, 1 , 1 )
134134 logInfo(" ==================== 5 ====================" )
135135 // wait until metadata is propagated
136136 waitUntilMetadataIsPropagated(topic, 0 )
@@ -166,7 +166,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
166166 private def waitUntilMetadataIsPropagated (topic : String , partition : Int ) {
167167 eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
168168 assert(
169- server.apis.leaderCache.keySet.contains( TopicAndPartition ( topic, partition) ),
169+ server.apis.metadataCache.containsTopicAndPartition( topic, partition),
170170 s " Partition [ $topic, $partition] metadata not propagated after timeout "
171171 )
172172 }
0 commit comments