Skip to content

Commit 2453922

Browse files
committed
[MINOR] Clean up several build warnings, mostly due to internal use of old accumulators
Another PR to clean up recent build warnings. This particularly cleans up several instances of the old accumulator API usage in tests that are straightforward to update. I think this qualifies as "minor". Jenkins Author: Sean Owen <[email protected]> Closes #13642 from srowen/BuildWarnings. (cherry picked from commit 6151d26) Signed-off-by: Sean Owen <[email protected]>
1 parent e03c251 commit 2453922

File tree

6 files changed

+31
-136
lines changed

6 files changed

+31
-136
lines changed

core/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,12 @@
356356
<phase>generate-resources</phase>
357357
<configuration>
358358
<!-- Execute the shell script to generate the spark build information. -->
359-
<tasks>
359+
<target>
360360
<exec executable="${project.basedir}/../build/spark-build-info">
361361
<arg value="${project.build.directory}/extra-resources"/>
362-
<arg value="${pom.version}"/>
362+
<arg value="${project.version}"/>
363363
</exec>
364-
</tasks>
364+
</target>
365365
</configuration>
366366
<goals>
367367
<goal>run</goal>

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1593,13 +1593,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
15931593
}
15941594

15951595
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
1596-
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
1597-
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
1598-
override def zero(initialValue: Int): Int = 0
1599-
override def addInPlace(r1: Int, r2: Int): Int = {
1600-
throw new DAGSchedulerSuiteDummyException
1601-
}
1602-
})
1596+
val acc = new LongAccumulator {
1597+
override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException
1598+
override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException
1599+
}
1600+
sc.register(acc)
16031601

16041602
// Run this on executors
16051603
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }

core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
146146
test("accumulators are updated on exception failures") {
147147
// This means use 1 core and 4 max task failures
148148
sc = new SparkContext("local[1,4]", "test")
149-
val param = AccumulatorParam.LongAccumulatorParam
150149
// Create 2 accumulators, one that counts failed values and another that doesn't
151-
val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
152-
val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
150+
val acc1 = AccumulatorSuite.createLongAccum("x", true)
151+
val acc2 = AccumulatorSuite.createLongAccum("y", false)
153152
// Fail first 3 attempts of every task. This means each task should be run 4 times.
154153
sc.parallelize(1 to 10, 10).map { i =>
155-
acc1 += 1
156-
acc2 += 1
154+
acc1.add(1)
155+
acc2.add(1)
157156
if (TaskContext.get.attemptNumber() <= 2) {
158157
throw new Exception("you did something wrong")
159158
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
1919

2020
import scala.collection.mutable.HashSet
2121

22-
import org.apache.spark.{Accumulator, AccumulatorParam}
2322
import org.apache.spark.internal.Logging
2423
import org.apache.spark.rdd.RDD
2524
import org.apache.spark.sql._
@@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
2827
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
2928
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
3029
import org.apache.spark.sql.internal.SQLConf
31-
import org.apache.spark.util.LongAccumulator
30+
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
3231

3332
/**
3433
* Contains methods for debugging query execution.
@@ -108,26 +107,27 @@ package object debug {
108107
private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
109108
def output: Seq[Attribute] = child.output
110109

111-
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
112-
def zero(initialValue: HashSet[String]): HashSet[String] = {
113-
initialValue.clear()
114-
initialValue
115-
}
116-
117-
def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
118-
v1 ++= v2
119-
v1
110+
class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] {
111+
private val _set = new HashSet[T]()
112+
override def isZero: Boolean = _set.isEmpty
113+
override def copy(): AccumulatorV2[T, HashSet[T]] = {
114+
val newAcc = new SetAccumulator[T]()
115+
newAcc._set ++= _set
116+
newAcc
120117
}
118+
override def reset(): Unit = _set.clear()
119+
override def add(v: T): Unit = _set += v
120+
override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value
121+
override def value: HashSet[T] = _set
121122
}
122123

123124
/**
124125
* A collection of metrics for each column of output.
125-
*
126-
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
127-
* causing the wrong data to be projected.
128126
*/
129-
case class ColumnMetrics(
130-
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
127+
case class ColumnMetrics() {
128+
val elementTypes = new SetAccumulator[String]
129+
sparkContext.register(elementTypes)
130+
}
131131

132132
val tupleCount: LongAccumulator = sparkContext.longAccumulator
133133

@@ -155,7 +155,7 @@ package object debug {
155155
while (i < numColumns) {
156156
val value = currentRow.get(i, output(i).dataType)
157157
if (value != null) {
158-
columnStats(i).elementTypes += HashSet(value.getClass.getName)
158+
columnStats(i).elementTypes.add(value.getClass.getName)
159159
}
160160
i += 1
161161
}

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 1 addition & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,18 @@
1717

1818
package org.apache.spark.sql.execution.metric
1919

20-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
21-
22-
import scala.collection.mutable
23-
24-
import org.apache.xbean.asm5._
25-
import org.apache.xbean.asm5.Opcodes._
26-
2720
import org.apache.spark.SparkFunSuite
2821
import org.apache.spark.sql._
2922
import org.apache.spark.sql.execution.SparkPlanInfo
3023
import org.apache.spark.sql.execution.ui.SparkPlanGraph
3124
import org.apache.spark.sql.functions._
3225
import org.apache.spark.sql.internal.SQLConf
3326
import org.apache.spark.sql.test.SharedSQLContext
34-
import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils}
35-
27+
import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
3628

3729
class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
3830
import testImplicits._
3931

40-
test("SQLMetric should not box Long") {
41-
val l = SQLMetrics.createMetric(sparkContext, "long")
42-
val f = () => {
43-
l += 1L
44-
l.add(1L)
45-
}
46-
val cl = BoxingFinder.getClassReader(f.getClass)
47-
val boxingFinder = new BoxingFinder()
48-
cl.accept(boxingFinder, 0)
49-
assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
50-
}
51-
52-
test("Normal accumulator should do boxing") {
53-
// We need this test to make sure BoxingFinder works.
54-
val l = sparkContext.accumulator(0L)
55-
val f = () => { l += 1L }
56-
val cl = BoxingFinder.getClassReader(f.getClass)
57-
val boxingFinder = new BoxingFinder()
58-
cl.accept(boxingFinder, 0)
59-
assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
60-
}
61-
6232
/**
6333
* Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
6434
*
@@ -323,76 +293,3 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
323293
}
324294

325295
}
326-
327-
private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)
328-
329-
/**
330-
* If `method` is null, search all methods of this class recursively to find if they do some boxing.
331-
* If `method` is specified, only search this method of the class to speed up the searching.
332-
*
333-
* This method will skip the methods in `visitedMethods` to avoid potential infinite cycles.
334-
*/
335-
private class BoxingFinder(
336-
method: MethodIdentifier[_] = null,
337-
val boxingInvokes: mutable.Set[String] = mutable.Set.empty,
338-
visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty)
339-
extends ClassVisitor(ASM5) {
340-
341-
private val primitiveBoxingClassName =
342-
Set("java/lang/Long",
343-
"java/lang/Double",
344-
"java/lang/Integer",
345-
"java/lang/Float",
346-
"java/lang/Short",
347-
"java/lang/Character",
348-
"java/lang/Byte",
349-
"java/lang/Boolean")
350-
351-
override def visitMethod(
352-
access: Int, name: String, desc: String, sig: String, exceptions: Array[String]):
353-
MethodVisitor = {
354-
if (method != null && (method.name != name || method.desc != desc)) {
355-
// If method is specified, skip other methods.
356-
return new MethodVisitor(ASM5) {}
357-
}
358-
359-
new MethodVisitor(ASM5) {
360-
override def visitMethodInsn(
361-
op: Int, owner: String, name: String, desc: String, itf: Boolean) {
362-
if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") {
363-
if (primitiveBoxingClassName.contains(owner)) {
364-
// Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l)
365-
boxingInvokes.add(s"$owner.$name")
366-
}
367-
} else {
368-
// scalastyle:off classforname
369-
val classOfMethodOwner = Class.forName(owner.replace('/', '.'), false,
370-
Thread.currentThread.getContextClassLoader)
371-
// scalastyle:on classforname
372-
val m = MethodIdentifier(classOfMethodOwner, name, desc)
373-
if (!visitedMethods.contains(m)) {
374-
// Keep track of visited methods to avoid potential infinite cycles
375-
visitedMethods += m
376-
val cl = BoxingFinder.getClassReader(classOfMethodOwner)
377-
visitedMethods += m
378-
cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0)
379-
}
380-
}
381-
}
382-
}
383-
}
384-
}
385-
386-
private object BoxingFinder {
387-
388-
def getClassReader(cls: Class[_]): ClassReader = {
389-
val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
390-
val resourceStream = cls.getResourceAsStream(className)
391-
val baos = new ByteArrayOutputStream(128)
392-
// Copy data over, before delegating to ClassReader -
393-
// else we can run out of open file handles.
394-
Utils.copyStream(resourceStream, baos, true)
395-
new ClassReader(new ByteArrayInputStream(baos.toByteArray))
396-
}
397-
398-
}

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
111111
}
112112

113113
def createContainer(host: String): Container = {
114+
// When YARN 2.6+ is required, avoid deprecation by using version with long second arg
114115
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
115116
containerNum += 1
116117
val nodeId = NodeId.newInstance(host, 1000)

0 commit comments

Comments
 (0)