Skip to content
This repository was archived by the owner on Nov 15, 2024. It is now read-only.

Commit b2f22cd

Browse files
zsxwingMatthewRBruce
authored andcommitted
[SPARK-19644][SQL] Clean up Scala reflection garbage after creating Encoder (branch-2.2)
## What changes were proposed in this pull request? Backport apache#19687 to branch-2.2. The major difference is `cleanUpReflectionObjects` is protected by `ScalaReflectionLock.synchronized` in this PR for Scala 2.10. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#19718 from zsxwing/SPARK-19644-2.2.
1 parent e7e44b4 commit b2f22cd

File tree

2 files changed

+54
-9
lines changed

2 files changed

+54
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ object ScalaReflection extends ScalaReflection {
6262
*/
6363
def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T])
6464

65-
private def dataTypeFor(tpe: `Type`): DataType = ScalaReflectionLock.synchronized {
65+
private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects {
6666
tpe match {
6767
case t if t <:< definitions.IntTpe => IntegerType
6868
case t if t <:< definitions.LongTpe => LongType
@@ -92,7 +92,7 @@ object ScalaReflection extends ScalaReflection {
9292
* Array[T]. Special handling is performed for primitive types to map them back to their raw
9393
* JVM form instead of the Scala Array that handles auto boxing.
9494
*/
95-
private def arrayClassFor(tpe: `Type`): ObjectType = ScalaReflectionLock.synchronized {
95+
private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects {
9696
val cls = tpe match {
9797
case t if t <:< definitions.IntTpe => classOf[Array[Int]]
9898
case t if t <:< definitions.LongTpe => classOf[Array[Long]]
@@ -145,7 +145,7 @@ object ScalaReflection extends ScalaReflection {
145145
private def deserializerFor(
146146
tpe: `Type`,
147147
path: Option[Expression],
148-
walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized {
148+
walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects {
149149

150150
/** Returns the current path with a sub-field extracted. */
151151
def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = {
@@ -452,7 +452,7 @@ object ScalaReflection extends ScalaReflection {
452452
inputObject: Expression,
453453
tpe: `Type`,
454454
walkedTypePath: Seq[String],
455-
seenTypeSet: Set[`Type`] = Set.empty): Expression = ScalaReflectionLock.synchronized {
455+
seenTypeSet: Set[`Type`] = Set.empty): Expression = cleanUpReflectionObjects {
456456

457457
def toCatalystArray(input: Expression, elementType: `Type`): Expression = {
458458
dataTypeFor(elementType) match {
@@ -638,7 +638,7 @@ object ScalaReflection extends ScalaReflection {
638638
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
639639
* we also treat [[DefinedByConstructorParams]] as product type.
640640
*/
641-
def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
641+
def optionOfProductType(tpe: `Type`): Boolean = cleanUpReflectionObjects {
642642
tpe match {
643643
case t if t <:< localTypeOf[Option[_]] =>
644644
val TypeRef(_, _, Seq(optType)) = t
@@ -700,7 +700,7 @@ object ScalaReflection extends ScalaReflection {
700700
def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T])
701701

702702
/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
703-
def schemaFor(tpe: `Type`): Schema = ScalaReflectionLock.synchronized {
703+
def schemaFor(tpe: `Type`): Schema = cleanUpReflectionObjects {
704704
tpe match {
705705
case t if t.typeSymbol.annotations.exists(_.tpe =:= typeOf[SQLUserDefinedType]) =>
706706
val udt = getClassFromType(t).getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance()
@@ -766,7 +766,7 @@ object ScalaReflection extends ScalaReflection {
766766
/**
767767
* Whether the fields of the given type is defined entirely by its constructor parameters.
768768
*/
769-
def definedByConstructorParams(tpe: Type): Boolean = {
769+
def definedByConstructorParams(tpe: Type): Boolean = cleanUpReflectionObjects {
770770
tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
771771
}
772772

@@ -795,6 +795,20 @@ trait ScalaReflection {
795795
// Since the map values can be mutable, we explicitly import scala.collection.Map at here.
796796
import scala.collection.Map
797797

798+
/**
799+
* Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
800+
* clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
801+
* `scala.reflect.runtime.JavaUniverse.undoLog`.
802+
*
803+
* This method will also wrap `func` with `ScalaReflectionLock.synchronized` so the caller doesn't
804+
* need to call it again.
805+
*
806+
* @see https://github.com/scala/bug/issues/8302
807+
*/
808+
def cleanUpReflectionObjects[T](func: => T): T = ScalaReflectionLock.synchronized {
809+
universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func)
810+
}
811+
798812
/**
799813
* Return the Scala Type for `T` in the current classloader mirror.
800814
*

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
3434
import org.apache.spark.sql.catalyst.util.ArrayData
3535
import org.apache.spark.sql.types._
3636
import org.apache.spark.unsafe.types.UTF8String
37+
import org.apache.spark.util.ClosureCleaner
3738

3839
case class RepeatedStruct(s: Seq[PrimitiveData])
3940

@@ -114,7 +115,9 @@ object ReferenceValueClass {
114115
class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
115116
OuterScopes.addOuterScope(this)
116117

117-
implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
118+
implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = verifyNotLeakingReflectionObjects {
119+
ExpressionEncoder()
120+
}
118121

119122
// test flat encoders
120123
encodeDecodeTest(false, "primitive boolean")
@@ -370,8 +373,12 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
370373
private def encodeDecodeTest[T : ExpressionEncoder](
371374
input: T,
372375
testName: String): Unit = {
373-
test(s"encode/decode for $testName: $input") {
376+
testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") {
374377
val encoder = implicitly[ExpressionEncoder[T]]
378+
379+
// Make sure encoder is serializable.
380+
ClosureCleaner.clean((s: String) => encoder.getClass.getName)
381+
375382
val row = encoder.toRow(input)
376383
val schema = encoder.schema.toAttributes
377384
val boundEncoder = encoder.resolveAndBind()
@@ -441,4 +448,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
441448
}
442449
}
443450
}
451+
452+
/**
453+
* Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to
454+
* ensure we don't leak Scala reflection garbage.
455+
*
456+
* @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
457+
*/
458+
private def verifyNotLeakingReflectionObjects[T](func: => T): T = {
459+
def undoLogSize: Int = {
460+
scala.reflect.runtime.universe
461+
.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.log.size
462+
}
463+
464+
val previousUndoLogSize = undoLogSize
465+
val r = func
466+
assert(previousUndoLogSize == undoLogSize)
467+
r
468+
}
469+
470+
private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) {
471+
test(testName) {
472+
verifyNotLeakingReflectionObjects(testFun)
473+
}
474+
}
444475
}

0 commit comments

Comments
 (0)