Skip to content

Commit 9bf9885

Browse files
cloud-fanmarmbrus
authored andcommitted
[SPARK-11564][SQL][FOLLOW-UP] clean up java tuple encoder
We need to support custom classes like java beans and combine them into tuple, and it's very hard to do it with the TypeTag-based approach. We should keep only the compose-based way to create tuple encoder. This PR also move `Encoder` to `org.apache.spark.sql` Author: Wenchen Fan <[email protected]> Closes #9567 from cloud-fan/java. (cherry picked from commit ec2b807) Signed-off-by: Michael Armbrust <[email protected]>
1 parent f9aeb96 commit 9bf9885

File tree

14 files changed

+65
-113
lines changed

14 files changed

+65
-113
lines changed
Lines changed: 7 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.catalyst.encoders
18+
package org.apache.spark.sql
1919

20-
import scala.reflect.ClassTag
21-
22-
import org.apache.spark.util.Utils
23-
import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
20+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2421
import org.apache.spark.sql.catalyst.expressions._
22+
import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
23+
import org.apache.spark.util.Utils
24+
25+
import scala.reflect.ClassTag
2526

2627
/**
2728
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
@@ -38,9 +39,7 @@ trait Encoder[T] extends Serializable {
3839
def clsTag: ClassTag[T]
3940
}
4041

41-
object Encoder {
42-
import scala.reflect.runtime.universe._
43-
42+
object Encoders {
4443
def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
4544
def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
4645
def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
@@ -129,54 +128,4 @@ object Encoder {
129128
constructExpression,
130129
ClassTag.apply(cls))
131130
}
132-
133-
def typeTagOfTuple2[T1 : TypeTag, T2 : TypeTag]: TypeTag[(T1, T2)] = typeTag[(T1, T2)]
134-
135-
private def getTypeTag[T](c: Class[T]): TypeTag[T] = {
136-
import scala.reflect.api
137-
138-
// val mirror = runtimeMirror(c.getClassLoader)
139-
val mirror = rootMirror
140-
val sym = mirror.staticClass(c.getName)
141-
val tpe = sym.selfType
142-
TypeTag(mirror, new api.TypeCreator {
143-
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
144-
if (m eq mirror) tpe.asInstanceOf[U # Type]
145-
else throw new IllegalArgumentException(
146-
s"Type tag defined in $mirror cannot be migrated to other mirrors.")
147-
})
148-
}
149-
150-
def forTuple[T1, T2](c1: Class[T1], c2: Class[T2]): Encoder[(T1, T2)] = {
151-
implicit val typeTag1 = getTypeTag(c1)
152-
implicit val typeTag2 = getTypeTag(c2)
153-
ExpressionEncoder[(T1, T2)]()
154-
}
155-
156-
def forTuple[T1, T2, T3](c1: Class[T1], c2: Class[T2], c3: Class[T3]): Encoder[(T1, T2, T3)] = {
157-
implicit val typeTag1 = getTypeTag(c1)
158-
implicit val typeTag2 = getTypeTag(c2)
159-
implicit val typeTag3 = getTypeTag(c3)
160-
ExpressionEncoder[(T1, T2, T3)]()
161-
}
162-
163-
def forTuple[T1, T2, T3, T4](
164-
c1: Class[T1], c2: Class[T2], c3: Class[T3], c4: Class[T4]): Encoder[(T1, T2, T3, T4)] = {
165-
implicit val typeTag1 = getTypeTag(c1)
166-
implicit val typeTag2 = getTypeTag(c2)
167-
implicit val typeTag3 = getTypeTag(c3)
168-
implicit val typeTag4 = getTypeTag(c4)
169-
ExpressionEncoder[(T1, T2, T3, T4)]()
170-
}
171-
172-
def forTuple[T1, T2, T3, T4, T5](
173-
c1: Class[T1], c2: Class[T2], c3: Class[T3], c4: Class[T4], c5: Class[T5])
174-
: Encoder[(T1, T2, T3, T4, T5)] = {
175-
implicit val typeTag1 = getTypeTag(c1)
176-
implicit val typeTag2 = getTypeTag(c2)
177-
implicit val typeTag3 = getTypeTag(c3)
178-
implicit val typeTag4 = getTypeTag(c4)
179-
implicit val typeTag5 = getTypeTag(c5)
180-
ExpressionEncoder[(T1, T2, T3, T4, T5)]()
181-
}
182131
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
package org.apache.spark.sql.catalyst.encoders
1919

20-
import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedExtractValue, UnresolvedAttribute}
21-
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
22-
import org.apache.spark.util.Utils
23-
2420
import scala.reflect.ClassTag
2521
import scala.reflect.runtime.universe.{typeTag, TypeTag}
2622

23+
import org.apache.spark.util.Utils
24+
import org.apache.spark.sql.Encoder
25+
import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedExtractValue, UnresolvedAttribute}
26+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
2929
import org.apache.spark.sql.catalyst.InternalRow
3030
import org.apache.spark.sql.catalyst.ScalaReflection
31-
import org.apache.spark.sql.types.{StructField, DataType, ObjectType, StructType}
31+
import org.apache.spark.sql.types.{StructField, ObjectType, StructType}
3232

3333
/**
3434
* A factory for constructing encoders that convert objects and primitves to and from the

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import org.apache.spark.sql.Encoder
21+
2022
package object encoders {
2123
private[sql] def encoderFor[A : Encoder]: ExpressionEncoder[A] = implicitly[Encoder[A]] match {
2224
case e: ExpressionEncoder[A] => e
2325
case _ => sys.error(s"Only expression encoders are supported today")
2426
}
2527
}
26-

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20+
import org.apache.spark.sql.Encoder
2021
import org.apache.spark.sql.catalyst.encoders._
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression

sql/core/src/main/scala/org/apache/spark/sql/Column.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.Experimental
2323
import org.apache.spark.Logging
2424
import org.apache.spark.sql.functions.lit
2525
import org.apache.spark.sql.catalyst.analysis._
26-
import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
26+
import org.apache.spark.sql.catalyst.encoders.encoderFor
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.util.DataTypeParser
2929
import org.apache.spark.sql.types._

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.util.Properties
2323
import scala.language.implicitConversions
2424
import scala.reflect.ClassTag
2525
import scala.reflect.runtime.universe.TypeTag
26-
import scala.util.control.NonFatal
2726

2827
import com.fasterxml.jackson.core.JsonFactory
2928
import org.apache.commons.lang3.StringUtils
@@ -35,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow
3534
import org.apache.spark.sql.catalyst.analysis._
3635
import org.apache.spark.sql.catalyst.expressions._
3736
import org.apache.spark.sql.catalyst.expressions.aggregate._
38-
import org.apache.spark.sql.catalyst.encoders.Encoder
3937
import org.apache.spark.sql.catalyst.plans.logical._
4038
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
4139
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}

sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424
import org.apache.spark.annotation.Experimental
2525
import org.apache.spark.api.java.function.{Function2 => JFunction2, Function3 => JFunction3, _}
2626
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
27-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor, Encoder}
27+
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
2828
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, Alias, Attribute}
2929
import org.apache.spark.sql.catalyst.plans.logical._
3030
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.rdd.RDD
3333
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
3434
import org.apache.spark.sql.SQLConf.SQLConfEntry
3535
import org.apache.spark.sql.catalyst.analysis._
36-
import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
36+
import org.apache.spark.sql.catalyst.encoders.encoderFor
3737
import org.apache.spark.sql.catalyst.errors.DialectException
3838
import org.apache.spark.sql.catalyst.expressions._
3939
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.aggregate
2020
import scala.language.existentials
2121

2222
import org.apache.spark.Logging
23+
import org.apache.spark.sql.Encoder
2324
import org.apache.spark.sql.expressions.Aggregator
2425
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
26+
import org.apache.spark.sql.catalyst.encoders.encoderFor
2627
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
2728
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2829
import org.apache.spark.sql.catalyst.expressions._

sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.expressions
1919

20-
import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
20+
import org.apache.spark.sql.Encoder
21+
import org.apache.spark.sql.catalyst.encoders.encoderFor
2122
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete}
2223
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
2324
import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn}

0 commit comments

Comments
 (0)