From c8850d13fe13597978b48827efb2d423c42c442e Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 13 Jun 2016 14:45:53 +0100 Subject: [PATCH 1/2] Clean up several build warnings, mostly due to internal use of old accumulators --- core/pom.xml | 6 ++-- .../spark/scheduler/DAGSchedulerSuite.scala | 12 +++---- .../scheduler/SchedulerIntegrationSuite.scala | 1 + .../spark/scheduler/TaskContextSuite.scala | 9 +++-- .../spark/sql/execution/debug/package.scala | 34 +++++++++---------- .../execution/metric/SQLMetricsSuite.scala | 10 ------ .../deploy/yarn/YarnAllocatorSuite.scala | 1 + 7 files changed, 31 insertions(+), 42 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index f5fdb4069601..90c8f97f2bba 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -356,12 +356,12 @@ generate-resources - + - + - + run diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f28f429e0c54..3c30ec8ee8e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1602,13 +1602,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } test("misbehaved accumulator should not crash DAGScheduler and SparkContext") { - val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { - override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 - override def zero(initialValue: Int): Int = 0 - override def addInPlace(r1: Int, r2: Int): Int = { - throw new DAGSchedulerSuiteDummyException - } - }) + val acc = new LongAccumulator { + override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException + override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException + } + sc.register(acc) // Run this on executors sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 5271a5671ad3..54b7312991ce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.{Await, Future} import scala.concurrent.duration.{Duration, SECONDS} +import scala.language.existentials import scala.reflect.ClassTag import org.scalactic.TripleEquals diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 368668bc7e2e..9eda79ace18d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -146,14 +146,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark test("accumulators are updated on exception failures") { // This means use 1 core and 4 max task failures sc = new SparkContext("local[1,4]", "test") - val param = AccumulatorParam.LongAccumulatorParam // Create 2 accumulators, one that counts failed values and another that doesn't - val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true) - val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false) + val acc1 = AccumulatorSuite.createLongAccum("x", true) + val acc2 = AccumulatorSuite.createLongAccum("y", false) // Fail first 3 attempts of every task. This means each task should be run 4 times. sc.parallelize(1 to 10, 10).map { i => - acc1 += 1 - acc2 += 1 + acc1.add(1) + acc2.add(1) if (TaskContext.get.attemptNumber() <= 2) { throw new Exception("you did something wrong") } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index f2c558ac2de7..e89f792496d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import scala.collection.mutable.HashSet -import org.apache.spark.{Accumulator, AccumulatorParam} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** * Contains methods for debugging query execution. @@ -108,26 +107,27 @@ package object debug { private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { def output: Seq[Attribute] = child.output - implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] { - def zero(initialValue: HashSet[String]): HashSet[String] = { - initialValue.clear() - initialValue - } - - def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = { - v1 ++= v2 - v1 + class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] { + private val _set = new HashSet[T]() + override def isZero: Boolean = _set.isEmpty + override def copy(): AccumulatorV2[T, HashSet[T]] = { + val newAcc = new SetAccumulator[T]() + newAcc._set ++= _set + newAcc } + override def reset(): Unit = _set.clear() + override def add(v: T): Unit = _set += v + override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value + override def value: HashSet[T] = _set } /** * A collection of metrics for each column of output. - * - * @param elementTypes the actual runtime types for the output. Useful when there are bugs - * causing the wrong data to be projected. */ - case class ColumnMetrics( - elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty)) + case class ColumnMetrics() { + val elementTypes = new SetAccumulator[String] + sparkContext.register(elementTypes) + } val tupleCount: LongAccumulator = sparkContext.longAccumulator @@ -155,7 +155,7 @@ package object debug { while (i < numColumns) { val value = currentRow.get(i, output(i).dataType) if (value != null) { - columnStats(i).elementTypes += HashSet(value.getClass.getName) + columnStats(i).elementTypes.add(value.getClass.getName) } i += 1 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index fd956bc4ef90..b47ee8aeca06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -49,16 +49,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}") } - test("Normal accumulator should do boxing") { - // We need this test to make sure BoxingFinder works. - val l = sparkContext.accumulator(0L) - val f = () => { l += 1L } - val cl = BoxingFinder.getClassReader(f.getClass) - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test") - } - /** * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics". * diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index f4f8bd435d5f..207dbf56d360 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -111,6 +111,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } def createContainer(host: String): Container = { + // When YARN 2.6+ is required, avoid deprecation by using version with long second arg val containerId = ContainerId.newInstance(appAttemptId, containerNum) containerNum += 1 val nodeId = NodeId.newInstance(host, 1000) From 20c2a6b98de236e0a269a1ed5e3ad2f370c7b8e7 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 14 Jun 2016 09:39:28 +0100 Subject: [PATCH 2/2] Remove other obsolete accumulator boxing test, and support code --- .../execution/metric/SQLMetricsSuite.scala | 95 +------------------ 1 file changed, 1 insertion(+), 94 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index b47ee8aeca06..579a095ff000 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -17,13 +17,6 @@ package org.apache.spark.sql.execution.metric -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - -import scala.collection.mutable - -import org.apache.xbean.asm5._ -import org.apache.xbean.asm5.Opcodes._ - import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.execution.SparkPlanInfo @@ -31,24 +24,11 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils} - +import org.apache.spark.util.{AccumulatorContext, JsonProtocol} class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { import testImplicits._ - test("SQLMetric should not box Long") { - val l = SQLMetrics.createMetric(sparkContext, "long") - val f = () => { - l += 1L - l.add(1L) - } - val cl = BoxingFinder.getClassReader(f.getClass) - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}") - } - /** * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics". * @@ -313,76 +293,3 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } - -private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String) - -/** - * If `method` is null, search all methods of this class recursively to find if they do some boxing. - * If `method` is specified, only search this method of the class to speed up the searching. - * - * This method will skip the methods in `visitedMethods` to avoid potential infinite cycles. - */ -private class BoxingFinder( - method: MethodIdentifier[_] = null, - val boxingInvokes: mutable.Set[String] = mutable.Set.empty, - visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty) - extends ClassVisitor(ASM5) { - - private val primitiveBoxingClassName = - Set("java/lang/Long", - "java/lang/Double", - "java/lang/Integer", - "java/lang/Float", - "java/lang/Short", - "java/lang/Character", - "java/lang/Byte", - "java/lang/Boolean") - - override def visitMethod( - access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): - MethodVisitor = { - if (method != null && (method.name != name || method.desc != desc)) { - // If method is specified, skip other methods. - return new MethodVisitor(ASM5) {} - } - - new MethodVisitor(ASM5) { - override def visitMethodInsn( - op: Int, owner: String, name: String, desc: String, itf: Boolean) { - if (op == INVOKESPECIAL && name == "" || op == INVOKESTATIC && name == "valueOf") { - if (primitiveBoxingClassName.contains(owner)) { - // Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l) - boxingInvokes.add(s"$owner.$name") - } - } else { - // scalastyle:off classforname - val classOfMethodOwner = Class.forName(owner.replace('/', '.'), false, - Thread.currentThread.getContextClassLoader) - // scalastyle:on classforname - val m = MethodIdentifier(classOfMethodOwner, name, desc) - if (!visitedMethods.contains(m)) { - // Keep track of visited methods to avoid potential infinite cycles - visitedMethods += m - val cl = BoxingFinder.getClassReader(classOfMethodOwner) - visitedMethods += m - cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0) - } - } - } - } - } -} - -private object BoxingFinder { - - def getClassReader(cls: Class[_]): ClassReader = { - val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" - val resourceStream = cls.getResourceAsStream(className) - val baos = new ByteArrayOutputStream(128) - // Copy data over, before delegating to ClassReader - - // else we can run out of open file handles. - Utils.copyStream(resourceStream, baos, true) - new ClassReader(new ByteArrayInputStream(baos.toByteArray)) - } - -}