Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Conversation

@foxish
Copy link
Member

@foxish foxish commented Apr 1, 2017

With these changes, Spark shell "almost" works.

./bin/spark-submit --master=k8s://<apiserver-ip:port> --deploy-mode=cluster --class=org.apache.spark.repl.Main \
--conf spark.kubernetes.driver.docker.image=foxish/spark-driver:vtry11 \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.1.0-kubernetes-0.1.0-rc1 \
--conf spark.driver.extraJavaOptions="-Dscala.usejavacp=true"

A kubectl attach to the driver pod shows:

2017-04-01 01:53:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.0.0.111:4040
Spark context available as 'sc' (master = k8s://https://35.184.141.248, app id = org-apache-spark-repl-main-1491011566260).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-k8s-0.1.0-SNAPSHOT
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :quit
2017-04-01 01:53:49 INFO  KubernetesSparkRestServer$KubernetesSubmitRequestServlet:54 - Spark application complete. Shutting down submission server...
2017-04-01 01:53:49 INFO  ServerConnector:306 - Stopped ServerConnector@4f905baf{HTTP/1.1}{org-apache-spark-repl-main-1491011566260:7077}

I think a separate process actually forks off for spark-shell which is why the rest submission driver immediately stops and marks the driver as succeeded. To detect when the driver can stop running, we probably need to watch for subprocesses to exit as well.

@lins05
Copy link

lins05 commented Apr 1, 2017

I wonder if it would lead to confusion: spark repl is used with "client" deploy mode with all other cluster managers, while here we use it with "cluster" deploy mode. IMHO it's better to follow the convention.

Anyway, I think this patch is exciting!

val fileBase64 = Base64.encodeBase64String(fileBytes)
UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName)
case "local" => ContainerAppResource(mainAppResource)
case "nop" => NopAppResource()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this nop resource I think is the same concept as Spark already has in SparkLauncher#NO_RESOURCE

Seems like this bit of your PR could be slightly modified to address #213 ?

@ash211
Copy link

ash211 commented Apr 5, 2017

I agree with @lins05 that the strong expectation for Spark users running spark-shell is that the driver is running locally to their laptop and they'll be able to access local files on their laptop through the shell, e.g. with sc.textFile("/Users/aash/myfile.txt")

There are still situations where it's useful to have the shell running in cluster mode like this via the attach though (e.g. data is stored in a file/object store of some sort rather than on local disk) though.

Getting shell working seamlessly would be a big win for spark on k8s for sure.

@foxish foxish closed this May 14, 2017
foxish pushed a commit that referenced this pull request Jul 24, 2017
…nd.stop

## What changes were proposed in this pull request?

`o.a.s.streaming.StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` is flaky is because there is a potential dead-lock in StandaloneSchedulerBackend which causes `await` timeout. Here is the related stack trace:
```
"Thread-31" #211 daemon prio=5 os_prio=31 tid=0x00007fedd4808000 nid=0x16403 waiting on condition [0x00007000239b7000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000079b49ca10> (a scala.concurrent.impl.Promise$CompletionLatch)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:402)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:213)
	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:517)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657)
	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1302)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:708)
	at org.apache.spark.streaming.StreamingContextSuite$$anonfun$43$$anonfun$apply$mcV$sp$66$$anon$3.run(StreamingContextSuite.scala:827)

"dispatcher-event-loop-3" #18 daemon prio=5 os_prio=31 tid=0x00007fedd603a000 nid=0x6203 waiting for monitor entry [0x0000700003be4000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:253)
	- waiting to lock <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:124)
	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

This PR removes `synchronized` and changes `stopping` to AtomicBoolean to ensure idempotent to fix the dead-lock.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#17610 from zsxwing/SPARK-20131.
@foxish foxish deleted the spark-shell-wip branch July 25, 2017 00:47
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants