Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,5 @@ INDEX
gen-java.*
.*avpr
org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }

@Override
public void onOtherEvent(SparkListenerEvent event) { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = {
logEvent(event, flushLogger = true)
}

/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import java.util.Properties
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.{Logging, TaskEndReason}
import com.fasterxml.jackson.annotation.JsonTypeInfo

import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.ui.SparkUI

@DeveloperApi
sealed trait SparkListenerEvent
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent

@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
Expand Down Expand Up @@ -130,6 +134,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
*/
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
* rebuild the history UI.
*/
private[spark] trait SparkHistoryListenerFactory {
/**
* Create listeners used to rebuild the history UI.
*/
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
}

/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
Expand Down Expand Up @@ -223,6 +238,11 @@ trait SparkListener {
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }

/**
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}

Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.ui

import java.util.Date
import java.util.{Date, ServiceLoader}

import scala.collection.JavaConverters._

import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
Expand Down Expand Up @@ -150,7 +153,16 @@ private[spark] object SparkUI {
appName: String,
basePath: String,
startTime: Long): SparkUI = {
create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
val sparkUI = create(
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)

val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
val listeners = listenerFactory.createListeners(conf, sparkUI)
listeners.foreach(listenerBus.addListener)
}
sparkUI
}

/**
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ package org.apache.spark.util

import java.util.{Properties, UUID}

import org.apache.spark.scheduler.cluster.ExecutorInfo

import scala.collection.JavaConverters._
import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._

/**
Expand All @@ -54,6 +56,8 @@ private[spark] object JsonProtocol {

private implicit val format = DefaultFormats

private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
Expand Down Expand Up @@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
case _ => parse(mapper.writeValueAsString(event))
}
}

Expand Down Expand Up @@ -504,6 +509,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory
19 changes: 14 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.beans.{BeanInfo, Introspector}
import java.util.Properties
import java.util.concurrent.atomic.AtomicReference


import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -1245,6 +1244,8 @@ object SQLContext {
*/
@transient private val instantiatedContext = new AtomicReference[SQLContext]()

@transient private val sqlListener = new AtomicReference[SQLListener]()

/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
*
Expand Down Expand Up @@ -1284,6 +1285,10 @@ object SQLContext {
Option(instantiatedContext.get())
}

private[sql] def clearSqlListener(): Unit = {
sqlListener.set(null)
}

/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
Expand Down Expand Up @@ -1332,9 +1337,13 @@ object SQLContext {
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method is going to be a no-op under certain circumstances then those circumstances should be documented here.

*/
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
val listener = new SQLListener(sc.conf)
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
listener
if (sqlListener.get() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's still legal to have multiple active SQLContexts in an application (although this is not a good practice), isn't this a user-facing behavioral change? If a user creates multiple SQLContexts, then they will now only have one SQL tab in the UI and one SQLListener, which is different than the old behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is the only purpose of sqlListener to prevent multiple listeners from being created at the same time? If so, I think it would be better to use an AtomicBoolean so that we don't create another strong reference to a SQLListener, which might have a lot of internal state that could lead to memory leaks.

val listener = new SQLListener(sc.conf)
if (sqlListener.compareAndSet(null, listener)) {
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
}
}
sqlListener.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
SparkListenerSQLExecutionEnd}
import org.apache.spark.util.Utils

private[sql] object SQLExecution {
Expand All @@ -45,25 +46,14 @@ private[sql] object SQLExecution {
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
sqlContext.listener.onExecutionStart(
executionId,
callSite.shortForm,
callSite.longForm,
queryExecution.toString,
SparkPlanGraph(queryExecution.executedPlan),
System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll summon @marmbrus to see whether the change in semantics here is ok. The events would now be processed asynchronously instead of in the same thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, as SQLListener will handle the event of SparkListenerSQLExecutionStart properly.
[UPDATE], as well as the event of SparkListenerSQLExecutionEnd.

executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
// Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
// However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
// SQL event types to SparkListener since it's a public API, we cannot guarantee that.
//
// SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
//
// The worst case is onExecutionEnd may happen before onJobStart when the listener thread
// is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
}
} finally {
sc.setLocalProperty(EXECUTION_ID_KEY, null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
* Stores information about a SQL SparkPlan.
*/
@DeveloperApi
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metrics: Seq[SQLMetricInfo])

private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
val children = plan.children.map(fromSparkPlan)

new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.metric

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Stores information about a SQL Metric.
*/
@DeveloperApi
class SQLMetricInfo(
val name: String,
val accumulatorId: Long,
val metricParam: String)
Loading