diff --git a/pom.xml b/pom.xml index c03d44f..d7302f3 100644 --- a/pom.xml +++ b/pom.xml @@ -8,17 +8,22 @@ kafka-topic-client 1.0-SNAPSHOT + + 1.8 + 1.8 + + org.apache.kafka - kafka_2.11 - 0.9.0.0 + kafka_2.12 + 1.0.0 com.101tec zkclient - 0.7 + 0.10 @@ -49,4 +54,4 @@ - \ No newline at end of file + diff --git a/src/main/java/Client.java b/src/main/java/Client.java index 91a9e30..010107b 100644 --- a/src/main/java/Client.java +++ b/src/main/java/Client.java @@ -6,6 +6,8 @@ import java.util.concurrent.TimeUnit; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.admin.RackAwareMode.Safe$; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; @@ -15,6 +17,7 @@ public class Client { final static boolean resetTopic = Boolean.parseBoolean(System.getenv("RESET_TOPIC")); final static int partitions = Integer.parseInt(System.getenv("NUM_PARTITIONS")); final static int replication = Integer.parseInt(System.getenv("NUM_REPLICAS")); + final static RackAwareMode rackAwareMode = Safe$.MODULE$; final static int nRetries = Integer.parseInt(System.getenv("NUM_CREATE_RETRIES")); @@ -64,7 +67,7 @@ private static void tryCreate(ZkUtils zkUtils, String topicName, int nRetriesLef System.out.println("Creating topic " + topicName); Properties topicConfig = new Properties(); // add per-topic configurations settings here try { - AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig); + AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig, rackAwareMode); } catch (Exception e) { if (nRetriesLeft <= 0) { throw new RuntimeException("Failed to create topic \"" + topicName + "\". Is Kafka and Zookeeper running?");