Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 136 additions & 2 deletions repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ import java.io.BufferedReader
// scalastyle:off println
import scala.Predef.{println => _, _}
// scalastyle:on println
import scala.concurrent.Future
import scala.reflect.classTag
import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader
import scala.reflect.io.File
import scala.tools.nsc.{GenericRunnerSettings, Properties}
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
import scala.tools.nsc.interpreter.{isReplDebug, isReplPower, replProps}
import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, JPrintWriter}
import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, SplashReader}
import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain
import scala.tools.nsc.util.stringFromStream
import scala.util.Properties.{javaVersion, javaVmName, versionString}

Expand All @@ -36,7 +44,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def this() = this(None, new JPrintWriter(Console.out, true))

override def createInterpreter(): Unit = {
intp = new SparkILoopInterpreter(settings, out, initializeSpark)
intp = new SparkILoopInterpreter(settings, out)
}

val initializationCommands: Seq[String] = Seq(
Expand Down Expand Up @@ -116,6 +124,132 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
super.replay()
}

/**
* The following code is mostly a copy of `process` implementation in `ILoop.scala` in Scala
*
* In newer version of Scala, `printWelcome` is the first thing to be called. As a result,
* SparkUI URL information would be always shown after the welcome message.
*
* However, this is inconsistent compared with the existing version of Spark which will always
* show SparkUI URL first.
*
* The only way we can make it consistent will be duplicating the Scala code.
*
* We should remove this duplication once Scala provides a way to load our custom initialization
* code, and also customize the ordering of printing welcome message.
*/
override def process(settings: Settings): Boolean = savingContextLoader {

def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))

/** Reader to use before interpreter is online. */
def preLoop = {
val sr = SplashReader(newReader) { r =>
in = r
in.postInit()
}
in = sr
SplashLoop(sr, prompt)
}

/* Actions to cram in parallel while collecting first user input at prompt.
* Run with output muted both from ILoop and from the intp reporter.
*/
def loopPostInit(): Unit = mumly {
// Bind intp somewhere out of the regular namespace where
// we can get at it in generated code.
intp.quietBind(NamedParam[IMain]("$intp", intp)(tagOfIMain, classTag[IMain]))

// Auto-run code via some setting.
( replProps.replAutorunCode.option
flatMap (f => File(f).safeSlurp())
foreach (intp quietRun _)
)
// power mode setup
if (isReplPower) enablePowerMode(true)
initializeSpark()
loadInitFiles()
// SI-7418 Now, and only now, can we enable TAB completion.
in.postInit()
}
def loadInitFiles(): Unit = settings match {
case settings: GenericRunnerSettings =>
for (f <- settings.loadfiles.value) {
loadCommand(f)
addReplay(s":load $f")
}
for (f <- settings.pastefiles.value) {
pasteCommand(f)
addReplay(s":paste $f")
}
case _ =>
}
// wait until after startup to enable noisy settings
def withSuppressedSettings[A](body: => A): A = {
val ss = this.settings
import ss._
val noisy = List(Xprint, Ytyperdebug)
val noisesome = noisy.exists(!_.isDefault)
val current = (Xprint.value, Ytyperdebug.value)
if (isReplDebug || !noisesome) body
else {
this.settings.Xprint.value = List.empty
this.settings.Ytyperdebug.value = false
try body
finally {
Xprint.value = current._1
Ytyperdebug.value = current._2
intp.global.printTypings = current._2
}
}
}
def startup(): String = withSuppressedSettings {
// let them start typing
val splash = preLoop

// while we go fire up the REPL
try {
// don't allow ancient sbt to hijack the reader
savingReader {
createInterpreter()
}
intp.initializeSynchronous()

val field = classOf[ILoop].getDeclaredFields.filter(_.getName.contains("globalFuture")).head
field.setAccessible(true)
field.set(this, Future successful true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reflection has to be used to access private globalFuture in ILoop.


if (intp.reporter.hasErrors) {
echo("Interpreter encountered errors during initialization!")
null
} else {
loopPostInit()
printWelcome()
splash.start()

val line = splash.line // what they typed in while they were waiting
if (line == null) { // they ^D
try out print Properties.shellInterruptedString
finally closeInterpreter()
}
line
}
} finally splash.stop()
}

this.settings = settings
startup() match {
case null => false
case line =>
try loop(line) match {
case LineResults.EOF => out print Properties.shellInterruptedString
case _ =>
}
catch AbstractOrMissingHandler()
finally closeInterpreter()
true
}
}
}

object SparkILoop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,8 @@ import scala.collection.mutable
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter._

class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit)
extends IMain(settings, out) { self =>

/**
* We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized
* and *before* the REPL sees any files in the private `loadInitFiles` functions, so that
* the Spark context is visible in those files.
*
* This is a bit of a hack, but there isn't another hook available to us at this point.
*
* See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail.
*/
override def initializeSynchronous(): Unit = {
super.initializeSynchronous()
initializeSpark()
}
class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) {
self =>

override lazy val memberHandlers = new {
val intp: self.type = self
Expand Down