Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[spark] class HttpServer(
throw new ServerStateException("Server is not started")
} else {
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
s"$scheme://${Utils.localHostNameForURI()}:$port"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LocalSparkCluster(
/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)

/* Start the Workers */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val localHostName = Utils.localHostNameForURI()
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

Expand Down
34 changes: 21 additions & 13 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.{ByteStreams, Files}
import com.google.common.net.InetAddresses
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -789,13 +790,12 @@ private[spark] object Utils extends Logging {
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
* Note, this is typically not used from within core spark.
*/
lazy val localIpAddress: String = findLocalIpAddress()
lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
private lazy val localIpAddress: InetAddress = findLocalInetAddress()

private def findLocalIpAddress(): String = {
private def findLocalInetAddress(): InetAddress = {
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
if (defaultIpOverride != null) {
defaultIpOverride
InetAddress.getByName(defaultIpOverride)
} else {
val address = InetAddress.getLocalHost
if (address.isLoopbackAddress) {
Expand All @@ -806,23 +806,28 @@ private[spark] object Utils extends Logging {
// It's more proper to pick ip address following system output order.
val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse

for (ni <- reOrderedNetworkIFs) {
for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
!addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
val addresses = ni.getInetAddresses.toList
.filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
if (addresses.nonEmpty) {
val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
// because of Inet6Address.toHostName may add interface at the end if it knows about it
val strippedAddress = InetAddress.getByAddress(addr.getAddress)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tell me more about what this fixes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all about finding our external IP address: old logic was to use external ipv4 || 127.0.0.1.
That transformes it to ipv4 || ipv6 || 127.0.0.1

Whenewer we don't have any ipv4 it's better to use ipv6 than 127.0.0.1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, though I don't think that was the thrust of this JIRA, which was simply to format IPv6 addresses correctly. I think the reasoning makes some sense though.

Nits: can you filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
and if (addresses.notEmpty) {? and find(_.isInstanceOf[Inet4Address])? Just questions of style in this block.

It might be worth a comment to the effect of your reply above here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i agree with that. Should i make additional commit or change existing one?
*nonEmpty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just push more commits to this branch

// We've found an address that looks reasonable!
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
" a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
" instead (on interface " + ni.getName + ")")
" a loopback address: " + address.getHostAddress + "; using " +
strippedAddress.getHostAddress + " instead (on interface " + ni.getName + ")")
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
return addr.getHostAddress
return strippedAddress
}
}
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
" a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
" external IP address!")
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
}
address.getHostAddress
address
}
}

Expand All @@ -842,11 +847,14 @@ private[spark] object Utils extends Logging {
* Get the local machine's hostname.
*/
def localHostName(): String = {
customHostname.getOrElse(localIpAddressHostname)
customHostname.getOrElse(localIpAddress.getHostAddress)
}

def getAddressHostName(address: String): String = {
InetAddress.getByName(address).getHostName
/**
* Get the local machine's URI.
*/
def localHostNameForURI(): String = {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}

def checkHost(host: String, message: String = "") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
Expand Down Expand Up @@ -118,7 +119,7 @@ private[kinesis] class KinesisReceiver(
* method.
*/
override def onStart() {
workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID()
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
credentialsProvider = new DefaultAWSCredentialsProviderChain()
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.{HiveShim, HiveContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.util.Utils

/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
Expand All @@ -37,7 +38,7 @@ private[hive] object SparkSQLEnv extends Logging {
val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")

sparkConf
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
.setAppName(s"SparkSQL::${Utils.localHostName()}")
.set("spark.sql.hive.version", HiveShim.version)
.set(
"spark.serializer",
Expand Down