Skip to content

Commit 80784a1

Browse files
committed
[SPARK-18057][FOLLOW-UP] Use 127.0.0.1 to avoid zookeeper picking up an ipv6 address
## What changes were proposed in this pull request? I'm still seeing the Kafka tests failed randomly due to `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING`. I checked the test output and saw zookeeper picked up an ipv6 address. Most details can be found in https://issues.apache.org/jira/browse/KAFKA-7193 This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper will never use an ipv6 address. ## How was this patch tested? Jenkins Closes #22097 from zsxwing/fix-zookeeper-connect. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 42263fd commit 80784a1

File tree

2 files changed

+96
-63
lines changed

2 files changed

+96
-63
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.kafka.clients.producer._
3939
import org.apache.kafka.common.TopicPartition
4040
import org.apache.kafka.common.network.ListenerName
4141
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
42+
import org.apache.kafka.common.utils.Exit
4243
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
4344
import org.scalatest.concurrent.Eventually._
4445
import org.scalatest.time.SpanSugar._
@@ -56,7 +57,7 @@ import org.apache.spark.util.Utils
5657
class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging {
5758

5859
// Zookeeper related configurations
59-
private val zkHost = "localhost"
60+
private val zkHost = "127.0.0.1"
6061
private var zkPort: Int = 0
6162
private val zkConnectionTimeout = 60000
6263
private val zkSessionTimeout = 6000
@@ -67,7 +68,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
6768
private var adminClient: AdminClient = null
6869

6970
// Kafka broker related configurations
70-
private val brokerHost = "localhost"
71+
private val brokerHost = "127.0.0.1"
7172
private var brokerPort = 0
7273
private var brokerConf: KafkaConfig = _
7374

@@ -138,40 +139,55 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
138139

139140
/** Teardown the whole servers, including Kafka broker and Zookeeper */
140141
def teardown(): Unit = {
141-
brokerReady = false
142-
zkReady = false
143-
144-
if (producer != null) {
145-
producer.close()
146-
producer = null
142+
// There is a race condition that may kill JVM when terminating the Kafka cluster. We set
143+
// a custom Procedure here during the termination in order to keep JVM running and not fail the
144+
// tests.
145+
val logExitEvent = new Exit.Procedure {
146+
override def execute(statusCode: Int, message: String): Unit = {
147+
logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)")
148+
}
147149
}
150+
Exit.setExitProcedure(logExitEvent)
151+
Exit.setHaltProcedure(logExitEvent)
152+
try {
153+
brokerReady = false
154+
zkReady = false
148155

149-
if (server != null) {
150-
server.shutdown()
151-
server.awaitShutdown()
152-
server = null
153-
}
156+
if (producer != null) {
157+
producer.close()
158+
producer = null
159+
}
154160

155-
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
156-
// in some cases. It leads to test failures on Windows if the directory deletion failure
157-
// throws an exception.
158-
brokerConf.logDirs.foreach { f =>
159-
try {
160-
Utils.deleteRecursively(new File(f))
161-
} catch {
162-
case e: IOException if Utils.isWindows =>
163-
logWarning(e.getMessage)
161+
if (server != null) {
162+
server.shutdown()
163+
server.awaitShutdown()
164+
server = null
164165
}
165-
}
166166

167-
if (zkUtils != null) {
168-
zkUtils.close()
169-
zkUtils = null
170-
}
167+
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
168+
// in some cases. It leads to test failures on Windows if the directory deletion failure
169+
// throws an exception.
170+
brokerConf.logDirs.foreach { f =>
171+
try {
172+
Utils.deleteRecursively(new File(f))
173+
} catch {
174+
case e: IOException if Utils.isWindows =>
175+
logWarning(e.getMessage)
176+
}
177+
}
171178

172-
if (zookeeper != null) {
173-
zookeeper.shutdown()
174-
zookeeper = null
179+
if (zkUtils != null) {
180+
zkUtils.close()
181+
zkUtils = null
182+
}
183+
184+
if (zookeeper != null) {
185+
zookeeper.shutdown()
186+
zookeeper = null
187+
}
188+
} finally {
189+
Exit.resetExitProcedure()
190+
Exit.resetHaltProcedure()
175191
}
176192
}
177193

@@ -299,8 +315,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
299315
protected def brokerConfiguration: Properties = {
300316
val props = new Properties()
301317
props.put("broker.id", "0")
302-
props.put("host.name", "localhost")
303-
props.put("advertised.host.name", "localhost")
318+
props.put("host.name", "127.0.0.1")
319+
props.put("advertised.host.name", "127.0.0.1")
304320
props.put("port", brokerPort.toString)
305321
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
306322
props.put("zookeeper.connect", zkAddress)

external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import kafka.utils.ZkUtils
3434
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
3535
import org.apache.kafka.common.network.ListenerName
3636
import org.apache.kafka.common.serialization.StringSerializer
37+
import org.apache.kafka.common.utils.Exit
3738
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
3839

3940
import org.apache.spark.SparkConf
@@ -50,7 +51,7 @@ import org.apache.spark.util.Utils
5051
private[kafka010] class KafkaTestUtils extends Logging {
5152

5253
// Zookeeper related configurations
53-
private val zkHost = "localhost"
54+
private val zkHost = "127.0.0.1"
5455
private var zkPort: Int = 0
5556
private val zkConnectionTimeout = 60000
5657
private val zkSessionTimeout = 6000
@@ -60,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
6061
private var zkUtils: ZkUtils = _
6162

6263
// Kafka broker related configurations
63-
private val brokerHost = "localhost"
64+
private val brokerHost = "127.0.0.1"
6465
private var brokerPort = 0
6566
private var brokerConf: KafkaConfig = _
6667

@@ -125,40 +126,55 @@ private[kafka010] class KafkaTestUtils extends Logging {
125126

126127
/** Teardown the whole servers, including Kafka broker and Zookeeper */
127128
def teardown(): Unit = {
128-
brokerReady = false
129-
zkReady = false
130-
131-
if (producer != null) {
132-
producer.close()
133-
producer = null
129+
// There is a race condition that may kill JVM when terminating the Kafka cluster. We set
130+
// a custom Procedure here during the termination in order to keep JVM running and not fail the
131+
// tests.
132+
val logExitEvent = new Exit.Procedure {
133+
override def execute(statusCode: Int, message: String): Unit = {
134+
logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)")
135+
}
134136
}
137+
Exit.setExitProcedure(logExitEvent)
138+
Exit.setHaltProcedure(logExitEvent)
139+
try {
140+
brokerReady = false
141+
zkReady = false
142+
143+
if (producer != null) {
144+
producer.close()
145+
producer = null
146+
}
135147

136-
if (server != null) {
137-
server.shutdown()
138-
server.awaitShutdown()
139-
server = null
140-
}
148+
if (server != null) {
149+
server.shutdown()
150+
server.awaitShutdown()
151+
server = null
152+
}
141153

142-
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
143-
// in some cases. It leads to test failures on Windows if the directory deletion failure
144-
// throws an exception.
145-
brokerConf.logDirs.foreach { f =>
146-
try {
147-
Utils.deleteRecursively(new File(f))
148-
} catch {
149-
case e: IOException if Utils.isWindows =>
150-
logWarning(e.getMessage)
154+
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
155+
// in some cases. It leads to test failures on Windows if the directory deletion failure
156+
// throws an exception.
157+
brokerConf.logDirs.foreach { f =>
158+
try {
159+
Utils.deleteRecursively(new File(f))
160+
} catch {
161+
case e: IOException if Utils.isWindows =>
162+
logWarning(e.getMessage)
163+
}
151164
}
152-
}
153165

154-
if (zkUtils != null) {
155-
zkUtils.close()
156-
zkUtils = null
157-
}
166+
if (zkUtils != null) {
167+
zkUtils.close()
168+
zkUtils = null
169+
}
158170

159-
if (zookeeper != null) {
160-
zookeeper.shutdown()
161-
zookeeper = null
171+
if (zookeeper != null) {
172+
zookeeper.shutdown()
173+
zookeeper = null
174+
}
175+
} finally {
176+
Exit.resetExitProcedure()
177+
Exit.resetHaltProcedure()
162178
}
163179
}
164180

@@ -217,7 +233,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
217233
private def brokerConfiguration: Properties = {
218234
val props = new Properties()
219235
props.put("broker.id", "0")
220-
props.put("host.name", "localhost")
236+
props.put("host.name", "127.0.0.1")
237+
props.put("advertised.host.name", "127.0.0.1")
221238
props.put("port", brokerPort.toString)
222239
props.put("log.dir", brokerLogDir)
223240
props.put("zookeeper.connect", zkAddress)

0 commit comments

Comments
 (0)