Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,5 @@ INDEX
gen-java.*
.*avpr
org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }

@Override
public void onOtherEvent(SparkListenerEvent event) { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = {
logEvent(event, flushLogger = true)
}

/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import java.util.Properties
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.{Logging, TaskEndReason}
import com.fasterxml.jackson.annotation.JsonTypeInfo

import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.ui.SparkUI

@DeveloperApi
sealed trait SparkListenerEvent
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent

@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
Expand Down Expand Up @@ -130,6 +134,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
*/
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
* rebuild the history UI.
*/
private[spark] trait SparkHistoryListenerFactory {
/**
* Create listeners used to rebuild the history UI.
*/
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
}

/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
Expand Down Expand Up @@ -223,6 +238,11 @@ trait SparkListener {
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }

/**
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}

Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.ui

import java.util.Date
import java.util.{Date, ServiceLoader}

import scala.collection.JavaConverters._

import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
Expand Down Expand Up @@ -154,7 +157,16 @@ private[spark] object SparkUI {
appName: String,
basePath: String,
startTime: Long): SparkUI = {
create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
val sparkUI = create(
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)

val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
val listeners = listenerFactory.createListeners(conf, sparkUI)
listeners.foreach(listenerBus.addListener)
}
sparkUI
}

/**
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ package org.apache.spark.util

import java.util.{Properties, UUID}

import org.apache.spark.scheduler.cluster.ExecutorInfo

import scala.collection.JavaConverters._
import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._

/**
Expand All @@ -54,6 +56,8 @@ private[spark] object JsonProtocol {

private implicit val format = DefaultFormats

private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
Expand Down Expand Up @@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
case _ => parse(mapper.writeValueAsString(event))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to add this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very possible that we silently pull in unnecessary information. If we have new event types, we should handle those explicitly instead of relying on this line. I am proposing to revert this line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I've said in similar conversations in other contexts, I'm strongly opposed to what you're suggesting. In fact I'm an advocate for exactly the opposite, and that's why I filed SPARK-12141.

BTW just removing that line would break the feature this patch is implementing, unless you write a whole lot of code to manually serialize all the SQL-related events.

Events are a public API, and they should be carefully crafted, since changing them affects user applications (including event logs). If there is unnecessary information in the event, then it's a bug in the event definition, not here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Events are a public API, and they should be carefully crafted, since changing them affects user applications (including event logs). If there is unnecessary information in the event, then it's a bug in the event definition, not here.

Yea. I totally agree. However, my concern is that having this line at here will make the developer harder to spot issues during the development. Since the serialization works automatically, we are not making a self-review on what will be serialized and what methods will be called during serialization a mandatory step, which makes the auditing work much harder. Although it introduces more work to the developer to make every event explicitly handled, when we review the pull request, we can clearly know what will be serialized and how a event is serialized when a pull request is submitted. What do you think?

btw, if I am missing any context, please let me know :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm perfectly ok with making auditing of these events harder if it means you're not writing manual serialization and de-serialization code like JsonProtocol.scala. The drawbacks of the latter are much worse for code readability and maintainability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this is really not the right forum to discuss this. If you want to discuss big changes like you're proposing, please discuss on the bug I opened (referenced above) or start a thread on the mailing list.

Your suggestion of removing that line will just break the feature and, to restore it, would require an insane amount of code motion and new code to be written. To start with, the SQL events are not even available in "core", so you can't reference the classes here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this is a terrible idea. Actually back in the days when we introduced magic serialization, I was against it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you guys please comment on the bug I opened or the mailing list? Commenting on a long closed github PR is not really the best forum.

I'd really like to understand why you think automatic serialization is a bad idea, since we use it in so many places. I think exactly the opposite - manual serialization is unmaintainable, error-prone, and a waste of developer time.

}
}

Expand Down Expand Up @@ -511,6 +516,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory
19 changes: 15 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,7 @@ class SQLContext private[sql](
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
SQLContext.clearInstantiatedContext()
SQLContext.clearSqlListener()
}
})

Expand Down Expand Up @@ -1263,6 +1264,8 @@ object SQLContext {
*/
@transient private val instantiatedContext = new AtomicReference[SQLContext]()

@transient private val sqlListener = new AtomicReference[SQLListener]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure that it's not overlooked, please see my comment on the other PRs regarding whether this needs to actually hold a SQLListener instance or whether it can simply be an AtomicBoolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many unit tests use sqlContext.listener. Can you please suggest how to update the unit tests if we changed to use an AtomicBoolean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see that we need this in order to be able to return a value from createListenerAndUI.


/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
*
Expand Down Expand Up @@ -1307,6 +1310,10 @@ object SQLContext {
Option(instantiatedContext.get())
}

private[sql] def clearSqlListener(): Unit = {
sqlListener.set(null)
}

/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
Expand Down Expand Up @@ -1355,9 +1362,13 @@ object SQLContext {
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
*/
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
val listener = new SQLListener(sc.conf)
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
listener
if (sqlListener.get() == null) {
val listener = new SQLListener(sc.conf)
if (sqlListener.compareAndSet(null, listener)) {
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
}
}
sqlListener.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
SparkListenerSQLExecutionEnd}
import org.apache.spark.util.Utils

private[sql] object SQLExecution {
Expand All @@ -45,25 +46,14 @@ private[sql] object SQLExecution {
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
sqlContext.listener.onExecutionStart(
executionId,
callSite.shortForm,
callSite.longForm,
queryExecution.toString,
SparkPlanGraph(queryExecution.executedPlan),
System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
// Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
// However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
// SQL event types to SparkListener since it's a public API, we cannot guarantee that.
//
// SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
//
// The worst case is onExecutionEnd may happen before onJobStart when the listener thread
// is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
}
} finally {
sc.setLocalProperty(EXECUTION_ID_KEY, null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
* Stores information about a SQL SparkPlan.
*/
@DeveloperApi
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metrics: Seq[SQLMetricInfo])

private[sql] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
val children = plan.children.map(fromSparkPlan)

new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.metric

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Stores information about a SQL Metric.
*/
@DeveloperApi
class SQLMetricInfo(
val name: String,
val accumulatorId: Long,
val metricParam: String)
Loading