@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
1919
2020import  java .io .{File , IOException }
2121import  java .lang .{Integer  =>  JInt }
22- import  java .net .InetSocketAddress 
22+ import  java .net .{ InetAddress ,  InetSocketAddress } 
2323import  java .nio .charset .StandardCharsets 
2424import  java .util .{Collections , Map  =>  JMap , Properties , UUID }
2525import  java .util .concurrent .TimeUnit 
@@ -68,10 +68,13 @@ class KafkaTestUtils(
6868
6969  private  val  JAVA_AUTH_CONFIG  =  " java.security.auth.login.config" 
7070
71+   private  val  localCanonicalHostName  =  InetAddress .getLoopbackAddress().getCanonicalHostName()
72+   logInfo(s " Local host name is  $localCanonicalHostName" )
73+ 
7174  private  var  kdc :  MiniKdc  =  _
7275
7376  //  Zookeeper related configurations
74-   private  val  zkHost  =  " localhost " 
77+   private  val  zkHost  =  localCanonicalHostName 
7578  private  var  zkPort :  Int  =  0 
7679  private  val  zkConnectionTimeout  =  60000 
7780  private  val  zkSessionTimeout  =  10000 
@@ -80,12 +83,12 @@ class KafkaTestUtils(
8083  private  var  zkUtils :  ZkUtils  =  _
8184
8285  //  Kafka broker related configurations
83-   private  val  brokerHost  =  " localhost " 
86+   private  val  brokerHost  =  localCanonicalHostName 
8487  private  var  brokerPort  =  0 
8588  private  var  brokerConf :  KafkaConfig  =  _
8689
8790  private  val  brokerServiceName  =  " kafka" 
88-   private  val  clientUser  =  " client/localhost " 
91+   private  val  clientUser  =  s " client/ $localCanonicalHostName " 
8992  private  var  clientKeytabFile :  File  =  _
9093
9194  //  Kafka broker server
@@ -139,17 +142,17 @@ class KafkaTestUtils(
139142    assert(kdcReady, " KDC should be set up beforehand"  )
140143    val  baseDir  =  Utils .createTempDir()
141144
142-     val  zkServerUser  =  " zookeeper/localhost " 
145+     val  zkServerUser  =  s " zookeeper/ $localCanonicalHostName " 
143146    val  zkServerKeytabFile  =  new  File (baseDir, " zookeeper.keytab"  )
144147    kdc.createPrincipal(zkServerKeytabFile, zkServerUser)
145148    logDebug(s " Created keytab file:  ${zkServerKeytabFile.getAbsolutePath()}" )
146149
147-     val  zkClientUser  =  " zkclient/localhost " 
150+     val  zkClientUser  =  s " zkclient/ $localCanonicalHostName " 
148151    val  zkClientKeytabFile  =  new  File (baseDir, " zkclient.keytab"  )
149152    kdc.createPrincipal(zkClientKeytabFile, zkClientUser)
150153    logDebug(s " Created keytab file:  ${zkClientKeytabFile.getAbsolutePath()}" )
151154
152-     val  kafkaServerUser  =  " kafka/localhost " 
155+     val  kafkaServerUser  =  s " kafka/ $localCanonicalHostName " 
153156    val  kafkaServerKeytabFile  =  new  File (baseDir, " kafka.keytab"  )
154157    kdc.createPrincipal(kafkaServerKeytabFile, kafkaServerUser)
155158    logDebug(s " Created keytab file:  ${kafkaServerKeytabFile.getAbsolutePath()}" )
0 commit comments