From d0681b33dbcb541bf53a7ef47f1fe5664d70a819 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Nov 2015 10:43:37 +0800 Subject: [PATCH 1/3] clean up tuple encoder for java --- .../spark/sql/{catalyst/encoders => }/Encoder.scala | 11 ++++++----- .../sql/catalyst/encoders/ExpressionEncoder.scala | 1 + .../apache/spark/sql/catalyst/encoders/package.scala | 2 ++ .../sql/catalyst/plans/logical/basicOperators.scala | 1 + .../src/main/scala/org/apache/spark/sql/Column.scala | 1 - .../main/scala/org/apache/spark/sql/DataFrame.scala | 1 - .../scala/org/apache/spark/sql/GroupedDataset.scala | 2 +- .../main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../test/org/apache/spark/sql/JavaDatasetSuite.java | 5 ++--- .../test/scala/org/apache/spark/sql/QueryTest.scala | 1 - 10 files changed, 14 insertions(+), 13 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/{catalyst/encoders => }/Encoder.scala (98%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala similarity index 98% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 6569b900fed9..dcdbdde76ce2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -15,13 +15,14 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.encoders +package org.apache.spark.sql -import scala.reflect.ClassTag - -import org.apache.spark.util.Utils -import org.apache.spark.sql.types.{ObjectType, StructField, StructType} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{ObjectType, StructField, StructType} +import org.apache.spark.util.Utils + +import scala.reflect.ClassTag /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index c287aebeeee0..906e6dc9c8c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.encoders +import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedExtractValue, UnresolvedAttribute} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} import org.apache.spark.util.Utils diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala index d4642a500672..6a47a6ad3342 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst +import org.apache.spark.sql.Encoder + package object encoders { private[sql] def encoderFor[A : Encoder]: ExpressionEncoder[A] = implicitly[Encoder[A]] match { case e: ExpressionEncoder[A] => e diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index e151ac04ede2..5022253752c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index c32c93897ce0..9f1fefee4fe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -23,7 +23,6 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.Logging import org.apache.spark.sql.functions.lit import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DataTypeParser import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 8ab958adadcc..de8e9fa0ba52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -34,7 +34,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala index 5c3f62654587..9177e39fc60b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.function.{Function2 => JFunction2, Function3 => JFunction3, _} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor, Encoder} +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor} import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, Alias, Attribute} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.QueryExecution diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5598731af5fc..bcfb409dc7c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -34,7 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} +import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 0f90de774dd3..11526ae3a872 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -29,10 +29,9 @@ import org.apache.spark.Accumulator; import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.catalyst.encoders.Encoder; -import org.apache.spark.sql.catalyst.encoders.Encoder$; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoder$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.GroupedDataset; import org.apache.spark.sql.test.TestSQLContext; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 3c174efe73ff..7a8b7ae5bf26 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.columnar.InMemoryRelation -import org.apache.spark.sql.catalyst.encoders.Encoder abstract class QueryTest extends PlanTest { From fb1fe8f2b87df9e84ab794c24dbf965affb28cb4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Nov 2015 10:49:19 +0800 Subject: [PATCH 2/3] remove unused code --- .../scala/org/apache/spark/sql/Encoder.scala | 52 ------------------- .../catalyst/encoders/ExpressionEncoder.scala | 11 ++-- .../spark/sql/catalyst/encoders/package.scala | 1 - .../aggregate/TypedAggregateExpression.scala | 3 +- .../spark/sql/expressions/Aggregator.scala | 3 +- .../org/apache/spark/sql/functions.scala | 2 +- .../spark/sql/DatasetAggregatorSuite.scala | 4 +- 7 files changed, 11 insertions(+), 65 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index dcdbdde76ce2..3e9f5f22ba7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -40,8 +40,6 @@ trait Encoder[T] extends Serializable { } object Encoder { - import scala.reflect.runtime.universe._ - def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true) def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true) def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true) @@ -130,54 +128,4 @@ object Encoder { constructExpression, ClassTag.apply(cls)) } - - def typeTagOfTuple2[T1 : TypeTag, T2 : TypeTag]: TypeTag[(T1, T2)] = typeTag[(T1, T2)] - - private def getTypeTag[T](c: Class[T]): TypeTag[T] = { - import scala.reflect.api - - // val mirror = runtimeMirror(c.getClassLoader) - val mirror = rootMirror - val sym = mirror.staticClass(c.getName) - val tpe = sym.selfType - TypeTag(mirror, new api.TypeCreator { - def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = - if (m eq mirror) tpe.asInstanceOf[U # Type] - else throw new IllegalArgumentException( - s"Type tag defined in $mirror cannot be migrated to other mirrors.") - }) - } - - def forTuple[T1, T2](c1: Class[T1], c2: Class[T2]): Encoder[(T1, T2)] = { - implicit val typeTag1 = getTypeTag(c1) - implicit val typeTag2 = getTypeTag(c2) - ExpressionEncoder[(T1, T2)]() - } - - def forTuple[T1, T2, T3](c1: Class[T1], c2: Class[T2], c3: Class[T3]): Encoder[(T1, T2, T3)] = { - implicit val typeTag1 = getTypeTag(c1) - implicit val typeTag2 = getTypeTag(c2) - implicit val typeTag3 = getTypeTag(c3) - ExpressionEncoder[(T1, T2, T3)]() - } - - def forTuple[T1, T2, T3, T4]( - c1: Class[T1], c2: Class[T2], c3: Class[T3], c4: Class[T4]): Encoder[(T1, T2, T3, T4)] = { - implicit val typeTag1 = getTypeTag(c1) - implicit val typeTag2 = getTypeTag(c2) - implicit val typeTag3 = getTypeTag(c3) - implicit val typeTag4 = getTypeTag(c4) - ExpressionEncoder[(T1, T2, T3, T4)]() - } - - def forTuple[T1, T2, T3, T4, T5]( - c1: Class[T1], c2: Class[T2], c3: Class[T3], c4: Class[T4], c5: Class[T5]) - : Encoder[(T1, T2, T3, T4, T5)] = { - implicit val typeTag1 = getTypeTag(c1) - implicit val typeTag2 = getTypeTag(c2) - implicit val typeTag3 = getTypeTag(c3) - implicit val typeTag4 = getTypeTag(c4) - implicit val typeTag5 = getTypeTag(c5) - ExpressionEncoder[(T1, T2, T3, T4, T5)]() - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 906e6dc9c8c5..2dcd987c1b03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -17,19 +17,18 @@ package org.apache.spark.sql.catalyst.encoders -import org.apache.spark.sql.Encoder -import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedExtractValue, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} -import org.apache.spark.util.Utils - import scala.reflect.ClassTag import scala.reflect.runtime.universe.{typeTag, TypeTag} +import org.apache.spark.util.Utils +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedExtractValue, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.types.{StructField, DataType, ObjectType, StructType} +import org.apache.spark.sql.types.{StructField, ObjectType, StructType} /** * A factory for constructing encoders that convert objects and primitves to and from the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala index 6a47a6ad3342..2c35adca9c92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/package.scala @@ -25,4 +25,3 @@ package object encoders { case _ => sys.error(s"Only expression encoders are supported today") } } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 24d8122b6222..e6ad4e252d12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.aggregate import scala.language.existentials import org.apache.spark.Logging +import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} +import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index 0b3192a6da9d..c7390b679d45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.expressions -import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2} import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 6d56542ee087..5501ec04072d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -26,7 +26,7 @@ import scala.util.Try import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection} import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, Star} -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, Encoder} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint import org.apache.spark.sql.types._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 340470c096b8..cf26e2f68b68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.encoders.Encoder -import org.apache.spark.sql.functions._ import scala.language.postfixOps import org.apache.spark.sql.test.SharedSQLContext - +import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Aggregator /** An `Aggregator` that adds up any numeric type returned by the given function. */ From 9fc5456812482f52b02a2d11b8b14c5bc89534b5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 11 Nov 2015 11:39:20 +0800 Subject: [PATCH 3/3] rename Encoder to Encoders --- .../scala/org/apache/spark/sql/Encoder.scala | 2 +- .../apache/spark/sql/JavaDatasetSuite.java | 76 ++++++++++--------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 3e9f5f22ba7b..1ff7340557e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -39,7 +39,7 @@ trait Encoder[T] extends Serializable { def clsTag: ClassTag[T] } -object Encoder { +object Encoders { def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true) def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true) def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index e63f190b5542..33d8388f615a 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -31,7 +31,7 @@ import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoder$; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.GroupedDataset; import org.apache.spark.sql.test.TestSQLContext; @@ -41,7 +41,6 @@ public class JavaDatasetSuite implements Serializable { private transient JavaSparkContext jsc; private transient TestSQLContext context; - private transient Encoder$ e = Encoder$.MODULE$; @Before public void setUp() { @@ -66,7 +65,7 @@ private Tuple2 tuple2(T1 t1, T2 t2) { @Test public void testCollect() { List data = Arrays.asList("hello", "world"); - Dataset ds = context.createDataset(data, e.STRING()); + Dataset ds = context.createDataset(data, Encoders.STRING()); List collected = ds.collectAsList(); Assert.assertEquals(Arrays.asList("hello", "world"), collected); } @@ -74,7 +73,7 @@ public void testCollect() { @Test public void testTake() { List data = Arrays.asList("hello", "world"); - Dataset ds = context.createDataset(data, e.STRING()); + Dataset ds = context.createDataset(data, Encoders.STRING()); List collected = ds.takeAsList(1); Assert.assertEquals(Arrays.asList("hello"), collected); } @@ -82,7 +81,7 @@ public void testTake() { @Test public void testCommonOperation() { List data = Arrays.asList("hello", "world"); - Dataset ds = context.createDataset(data, e.STRING()); + Dataset ds = context.createDataset(data, Encoders.STRING()); Assert.assertEquals("hello", ds.first()); Dataset filtered = ds.filter(new FilterFunction() { @@ -99,7 +98,7 @@ public boolean call(String v) throws Exception { public Integer call(String v) throws Exception { return v.length(); } - }, e.INT()); + }, Encoders.INT()); Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList()); Dataset parMapped = ds.mapPartitions(new MapPartitionsFunction() { @@ -111,7 +110,7 @@ public Iterable call(Iterator it) throws Exception { } return ls; } - }, e.STRING()); + }, Encoders.STRING()); Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList()); Dataset flatMapped = ds.flatMap(new FlatMapFunction() { @@ -123,7 +122,7 @@ public Iterable call(String s) throws Exception { } return ls; } - }, e.STRING()); + }, Encoders.STRING()); Assert.assertEquals( Arrays.asList("h", "e", "l", "l", "o", "w", "o", "r", "l", "d"), flatMapped.collectAsList()); @@ -133,7 +132,7 @@ public Iterable call(String s) throws Exception { public void testForeach() { final Accumulator accum = jsc.accumulator(0); List data = Arrays.asList("a", "b", "c"); - Dataset ds = context.createDataset(data, e.STRING()); + Dataset ds = context.createDataset(data, Encoders.STRING()); ds.foreach(new ForeachFunction() { @Override @@ -147,7 +146,7 @@ public void call(String s) throws Exception { @Test public void testReduce() { List data = Arrays.asList(1, 2, 3); - Dataset ds = context.createDataset(data, e.INT()); + Dataset ds = context.createDataset(data, Encoders.INT()); int reduced = ds.reduce(new ReduceFunction() { @Override @@ -161,13 +160,13 @@ public Integer call(Integer v1, Integer v2) throws Exception { @Test public void testGroupBy() { List data = Arrays.asList("a", "foo", "bar"); - Dataset ds = context.createDataset(data, e.STRING()); + Dataset ds = context.createDataset(data, Encoders.STRING()); GroupedDataset grouped = ds.groupBy(new MapFunction() { @Override public Integer call(String v) throws Exception { return v.length(); } - }, e.INT()); + }, Encoders.INT()); Dataset mapped = grouped.map(new MapGroupFunction() { @Override @@ -178,7 +177,7 @@ public String call(Integer key, Iterator values) throws Exception { } return sb.toString(); } - }, e.STRING()); + }, Encoders.STRING()); Assert.assertEquals(Arrays.asList("1a", "3foobar"), mapped.collectAsList()); @@ -193,27 +192,27 @@ public Iterable call(Integer key, Iterator values) throws Except return Collections.singletonList(sb.toString()); } }, - e.STRING()); + Encoders.STRING()); Assert.assertEquals(Arrays.asList("1a", "3foobar"), flatMapped.collectAsList()); List data2 = Arrays.asList(2, 6, 10); - Dataset ds2 = context.createDataset(data2, e.INT()); + Dataset ds2 = context.createDataset(data2, Encoders.INT()); GroupedDataset grouped2 = ds2.groupBy(new MapFunction() { @Override public Integer call(Integer v) throws Exception { return v / 2; } - }, e.INT()); + }, Encoders.INT()); Dataset cogrouped = grouped.cogroup( grouped2, new CoGroupFunction() { @Override public Iterable call( - Integer key, - Iterator left, - Iterator right) throws Exception { + Integer key, + Iterator left, + Iterator right) throws Exception { StringBuilder sb = new StringBuilder(key.toString()); while (left.hasNext()) { sb.append(left.next()); @@ -225,7 +224,7 @@ public Iterable call( return Collections.singletonList(sb.toString()); } }, - e.STRING()); + Encoders.STRING()); Assert.assertEquals(Arrays.asList("1a#2", "3foobar#6", "5#10"), cogrouped.collectAsList()); } @@ -233,8 +232,9 @@ public Iterable call( @Test public void testGroupByColumn() { List data = Arrays.asList("a", "foo", "bar"); - Dataset ds = context.createDataset(data, e.STRING()); - GroupedDataset grouped = ds.groupBy(length(col("value"))).asKey(e.INT()); + Dataset ds = context.createDataset(data, Encoders.STRING()); + GroupedDataset grouped = + ds.groupBy(length(col("value"))).asKey(Encoders.INT()); Dataset mapped = grouped.map( new MapGroupFunction() { @@ -247,7 +247,7 @@ public String call(Integer key, Iterator data) throws Exception { return sb.toString(); } }, - e.STRING()); + Encoders.STRING()); Assert.assertEquals(Arrays.asList("1a", "3foobar"), mapped.collectAsList()); } @@ -255,11 +255,11 @@ public String call(Integer key, Iterator data) throws Exception { @Test public void testSelect() { List data = Arrays.asList(2, 6); - Dataset ds = context.createDataset(data, e.INT()); + Dataset ds = context.createDataset(data, Encoders.INT()); Dataset> selected = ds.select( expr("value + 1"), - col("value").cast("string")).as(e.tuple(e.INT(), e.STRING())); + col("value").cast("string")).as(Encoders.tuple(Encoders.INT(), Encoders.STRING())); Assert.assertEquals( Arrays.asList(tuple2(3, "2"), tuple2(7, "6")), @@ -269,14 +269,14 @@ public void testSelect() { @Test public void testSetOperation() { List data = Arrays.asList("abc", "abc", "xyz"); - Dataset ds = context.createDataset(data, e.STRING()); + Dataset ds = context.createDataset(data, Encoders.STRING()); Assert.assertEquals( Arrays.asList("abc", "xyz"), sort(ds.distinct().collectAsList().toArray(new String[0]))); List data2 = Arrays.asList("xyz", "foo", "foo"); - Dataset ds2 = context.createDataset(data2, e.STRING()); + Dataset ds2 = context.createDataset(data2, Encoders.STRING()); Dataset intersected = ds.intersect(ds2); Assert.assertEquals(Arrays.asList("xyz"), intersected.collectAsList()); @@ -298,9 +298,9 @@ private > List sort(T[] data) { @Test public void testJoin() { List data = Arrays.asList(1, 2, 3); - Dataset ds = context.createDataset(data, e.INT()).as("a"); + Dataset ds = context.createDataset(data, Encoders.INT()).as("a"); List data2 = Arrays.asList(2, 3, 4); - Dataset ds2 = context.createDataset(data2, e.INT()).as("b"); + Dataset ds2 = context.createDataset(data2, Encoders.INT()).as("b"); Dataset> joined = ds.joinWith(ds2, col("a.value").equalTo(col("b.value"))); @@ -311,26 +311,28 @@ public void testJoin() { @Test public void testTupleEncoder() { - Encoder> encoder2 = e.tuple(e.INT(), e.STRING()); + Encoder> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING()); List> data2 = Arrays.asList(tuple2(1, "a"), tuple2(2, "b")); Dataset> ds2 = context.createDataset(data2, encoder2); Assert.assertEquals(data2, ds2.collectAsList()); - Encoder> encoder3 = e.tuple(e.INT(), e.LONG(), e.STRING()); + Encoder> encoder3 = + Encoders.tuple(Encoders.INT(), Encoders.LONG(), Encoders.STRING()); List> data3 = Arrays.asList(new Tuple3(1, 2L, "a")); Dataset> ds3 = context.createDataset(data3, encoder3); Assert.assertEquals(data3, ds3.collectAsList()); Encoder> encoder4 = - e.tuple(e.INT(), e.STRING(), e.LONG(), e.STRING()); + Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING()); List> data4 = Arrays.asList(new Tuple4(1, "b", 2L, "a")); Dataset> ds4 = context.createDataset(data4, encoder4); Assert.assertEquals(data4, ds4.collectAsList()); Encoder> encoder5 = - e.tuple(e.INT(), e.STRING(), e.LONG(), e.STRING(), e.BOOLEAN()); + Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.LONG(), Encoders.STRING(), + Encoders.BOOLEAN()); List> data5 = Arrays.asList(new Tuple5(1, "b", 2L, "a", true)); Dataset> ds5 = @@ -342,7 +344,7 @@ public void testTupleEncoder() { public void testNestedTupleEncoder() { // test ((int, string), string) Encoder, String>> encoder = - e.tuple(e.tuple(e.INT(), e.STRING()), e.STRING()); + Encoders.tuple(Encoders.tuple(Encoders.INT(), Encoders.STRING()), Encoders.STRING()); List, String>> data = Arrays.asList(tuple2(tuple2(1, "a"), "a"), tuple2(tuple2(2, "b"), "b")); Dataset, String>> ds = context.createDataset(data, encoder); @@ -350,7 +352,8 @@ public void testNestedTupleEncoder() { // test (int, (string, string, long)) Encoder>> encoder2 = - e.tuple(e.INT(), e.tuple(e.STRING(), e.STRING(), e.LONG())); + Encoders.tuple(Encoders.INT(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.LONG())); List>> data2 = Arrays.asList(tuple2(1, new Tuple3("a", "b", 3L))); Dataset>> ds2 = @@ -359,7 +362,8 @@ public void testNestedTupleEncoder() { // test (int, ((string, long), string)) Encoder, String>>> encoder3 = - e.tuple(e.INT(), e.tuple(e.tuple(e.STRING(), e.LONG()), e.STRING())); + Encoders.tuple(Encoders.INT(), + Encoders.tuple(Encoders.tuple(Encoders.STRING(), Encoders.LONG()), Encoders.STRING())); List, String>>> data3 = Arrays.asList(tuple2(1, tuple2(tuple2("a", 2L), "b"))); Dataset, String>>> ds3 =