-
Notifications
You must be signed in to change notification settings - Fork 117
Fix executor connect issues, perhaps by using pod IPs #214
Description
I was running a spark job using the DFSReadWriteTest code in the spark example jar. I ran it successfully before. This time, I was launching two executors, instead of one before. All of sudden, the job started to fail with the executor pod logs showing the following. (
Executors were trying to fetch shuffle results from each other's internal shuffle service):
2017-04-03 20:44:56 ERROR RetryingBlockFetcher:142 - Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to spark-dfstest-1491252168491-exec-1:38066
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:106)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:350)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:205)
at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1226)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:550)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:535)
at io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelOutboundHandlerAdapter.java:47)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:550)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:535)
at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:550)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:535)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:517)
at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:970)
at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:215)
at io.netty.bootstrap.Bootstrap$2.run(Bootstrap.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
...
Here's my command line:
$ /usr/local/spark-on-k8s/bin/spark-submit \
--class org.apache.spark.examples.DFSReadWriteTest \
--conf spark.app.name=spark-dfstest \
--conf spark.executor.instances=2 \
--conf spark.hadoop.fs.defaultFS=hdfs://hdfs-namenode-0.hdfs-namenode.kube-system.svc.cluster.local:8020
/usr/local/spark-on-k8s/examples/jars/spark-examples_2.11-2.1.0-k8s-0.1.0-SNAPSHOT.jar
/etc/hosts /user/root/dfstest/4
I went inside a pod and tried nslookup, which also failed:
$ kubectl exec -it spark-dfstest-1491252168491-exec-2 /bin/bash
bash-4.3# nslookup spark-dfstest-1491252168491-exec-1
nslookup: can't resolve '(null)': Name does not resolve
nslookup: can't resolve 'spark-dfstest-1491252168491-exec-1': Try again
So it seemed like a kube-dns issue. Digging it further, I found that kube-dns does not support resolution of pod host names. (This was a surprise to me. But I think I knew it before and then forgot about it recently)
There is an issue in the kube-dns repo (issue 70) titled "Error with pod hostname query", asking about the issue.
Query for the ..pods.cluster.local is not returning the A record for the pod
/ # dig customer-deployment-onuwixzygi4tkn3fgztgeylfgnrdm.62180010musmy.staging.pod.cluster.local.kube-dns logs
I0316 16:18:30.889074 1 logs.go:41] skydns: error from backend: Invalid IP Address >customer.deployment.onuwixzygi4tkn3fgztgeylfgnrdm.62180010musmy
The answer says:
thockin commented 18 days ago
Unless I misunderstand, we don't support that query, on purpose. We don't want DNS to have to watch all pods.
And the linked reference document also confirms that. kube-dns supports service name queries, but not pod name queries:
What things get DNS names?
Every Service defined in the cluster (including the DNS server itself) is assigned a DNS name. By default, a client Pod’s DNS search list will include the Pod’s own namespace and the cluster’s default domain.
I think we should fix this. Two possiblities:
- Executors currently register with the driver using host names. Change the code to use pod IPs. Note executors discover each other via the driver. I prototyped this and it seems to work. (I'll send this as PR in case this is what we want)
- Use headless service for executors like statefulset. But this seems overkill and goes against what kube-dns wanted to avoid.