Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,43 @@ import org.apache.spark.internal.Logging
*/
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
// Carriage return
val CR = '\r'
private val CR = '\r'
// Update period of progress bar, in milliseconds
val UPDATE_PERIOD = 200L
private val updatePeriodMSec =
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
// Delay to show up a progress bar, in milliseconds
val FIRST_DELAY = 500L
private val firstDelayMSec = 500L

// The width of terminal
val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
private val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
sys.env.get("COLUMNS").get.toInt
} else {
80
}

var lastFinishTime = 0L
var lastUpdateTime = 0L
var lastProgressBar = ""
private var lastFinishTime = 0L
private var lastUpdateTime = 0L
private var lastProgressBar = ""

// Schedule a refresh thread to run periodically
private val timer = new Timer("refresh progress", true)
timer.schedule(new TimerTask{
override def run() {
refresh()
}
}, FIRST_DELAY, UPDATE_PERIOD)
}, firstDelayMSec, updatePeriodMSec)

/**
* Try to refresh the progress bar in every cycle
*/
private def refresh(): Unit = synchronized {
val now = System.currentTimeMillis()
if (now - lastFinishTime < FIRST_DELAY) {
if (now - lastFinishTime < firstDelayMSec) {
return
}
val stageIds = sc.statusTracker.getActiveStageIds()
val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1)
.filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
.filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId())
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
Expand Down Expand Up @@ -94,7 +95,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
header + bar + tailer
}.mkString("")

// only refresh if it's changed of after 1 minute (or the ssh connection will be closed
// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
// after idle some time)
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
System.err.print(CR + bar)
Expand Down