Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui

import scala.collection.mutable

import com.fasterxml.jackson.databind.JavaType
import com.fasterxml.jackson.databind.`type`.TypeFactory
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.util.Converter

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
Expand All @@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
case class SparkListenerDriverAccumUpdates(
executionId: Long,
@JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
accumUpdates: Seq[(Long, Long)])
extends SparkListenerEvent

/**
* Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple.
*
* This is necessary due to limitations in how Jackson's scala module deserializes primitives;
* see the "Deserializing Option[Int] and other primitive challenges" section in
* https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and
* SPARK-18462 for the specific problem that motivated this conversion.
*/
private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] {

override def convert(in: (Object, Object)): (Long, Long) = {
def toLong(a: Object): Long = a match {
case i: java.lang.Integer => i.intValue()
case l: java.lang.Long => l.longValue()
}
(toLong(in._1), toLong(in._2))
}

override def getInputType(typeFactory: TypeFactory): JavaType = {
val objectType = typeFactory.uncheckedSimpleType(classOf[Object])
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType))
}

override def getOutputType(typeFactory: TypeFactory): JavaType = {
val longType = typeFactory.uncheckedSimpleType(classOf[Long])
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
}
}

class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {

override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui

import java.util.Properties

import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.mock

import org.apache.spark._
Expand All @@ -35,10 +36,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}


class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
import testImplicits._
import org.apache.spark.AccumulatorSuite.makeInfo

Expand Down Expand Up @@ -416,6 +417,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
}

test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") {
val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
val json = JsonProtocol.sparkEventToJson(event)
assertValidDataInJson(json,
parse("""
|{
| "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
| "executionId": 1,
| "accumUpdates": [[2,3]]
|}
""".stripMargin))
JsonProtocol.sparkEventFromJson(json) match {
case SparkListenerDriverAccumUpdates(executionId, accums) =>
assert(executionId == 1L)
accums.foreach { case (a, b) =>
assert(a == 2L)
assert(b == 3L)
}
}

// Test a case where the numbers in the JSON can only fit in longs:
val longJson = parse(
"""
|{
| "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
| "executionId": 4294967294,
| "accumUpdates": [[4294967294,3]]
|}
""".stripMargin)
JsonProtocol.sparkEventFromJson(longJson) match {
case SparkListenerDriverAccumUpdates(executionId, accums) =>
assert(executionId == 4294967294L)
accums.foreach { case (a, b) =>
assert(a == 4294967294L)
assert(b == 3L)
}
}
}

}


Expand Down