From 95b3301a57f99f516f156b1553b5ac3777b1039c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 8 Apr 2014 23:00:03 +0800 Subject: [PATCH 01/20] Fixed bugs in IntegralDelta --- .../compression/compressionSchemes.scala | 20 ++++++++----------- .../compression/IntegralDeltaSuite.scala | 15 ++++++++++---- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala index df8220b556edd..98dd2e83c9479 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala @@ -396,26 +396,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp if (initial) { initial = false - prev = value _compressedSize += 1 + columnType.defaultSize } else { val (smallEnough, _) = byteSizedDelta(value, prev) _compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize) } + + prev = value } override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = { to.putInt(typeId) if (from.hasRemaining) { - val prev = columnType.extract(from) - + var prev = columnType.extract(from) to.put(Byte.MinValue) columnType.append(prev, to) while (from.hasRemaining) { val current = columnType.extract(from) val (smallEnough, delta) = byteSizedDelta(current, prev) + prev = current if (smallEnough) { to.put(delta) @@ -442,13 +443,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp override def next() = { val delta = buffer.get() - - if (delta > Byte.MinValue) { - addDelta(prev, delta) - } else { - prev = columnType.extract(buffer) - prev - } + prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer) + prev } override def hasNext = buffer.hasRemaining @@ -464,7 +460,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] { override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = { val delta = x - y - if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) + if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) } } @@ -477,6 +473,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] { override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = { val delta = x - y - if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) + if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala index 1390e5eef6106..ce419ca7269ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.catalyst.types.IntegralType import org.apache.spark.sql.columnar._ +import org.apache.spark.sql.columnar.ColumnarTestUtils._ class IntegralDeltaSuite extends FunSuite { testIntegralDelta(new IntColumnStats, INT, IntDelta) @@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite { } else { val oneBoolean = columnType.defaultSize 1 + oneBoolean + deltas.map { - d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean + d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean }.sum }) @@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite { expectResult(input.head, "The first value is wrong")(columnType.extract(buffer)) (input.tail, deltas).zipped.foreach { (value, delta) => - if (delta < Byte.MaxValue) { + if (math.abs(delta) <= Byte.MaxValue) { expectResult(delta, "Wrong delta")(buffer.get()) } else { expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) @@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite { test(s"$scheme: simple case") { val input = columnType match { - case INT => Seq(1: Int, 2: Int, 130: Int) - case LONG => Seq(1: Long, 2: Long, 130: Long) + case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) } skeleton(input.map(_.asInstanceOf[I#JvmType])) } + + test(s"$scheme: long random series") { + // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here. + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeleton(input.map(_.asInstanceOf[I#JvmType])) + } } } From 052bf416304ec10f5d23c82223a61aa0cbcf1716 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 8 Apr 2014 23:48:19 +0800 Subject: [PATCH 02/20] Bug fix: should only gather compressibility info for non-null values --- .../sql/columnar/compression/CompressibleColumnBuilder.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index fd3b1adf9687a..0f808f68f2eec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType] abstract override def appendFrom(row: Row, ordinal: Int) { super.appendFrom(row, ordinal) - gatherCompressibilityStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + gatherCompressibilityStats(row, ordinal) + } } abstract override def build() = { From 44591a55dc626aaaa5dad737e0fcafef6ebb9777 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 9 Apr 2014 18:57:26 +0800 Subject: [PATCH 03/20] Bug fix: NullableColumnAccessor.hasNext must take nulls into account --- .../apache/spark/sql/columnar/NullableColumnAccessor.scala | 2 ++ .../spark/sql/columnar/NullableColumnAccessorSuite.scala | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala index 7d49ab07f7a53..b7f8826861a2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor { pos += 1 } + + abstract override def hasNext = seenNulls < nullCount || super.hasNext } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala index 4a21eb6201a69..35ab14cbc353d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite { val row = new GenericMutableRow(1) (0 until 4).foreach { _ => + assert(accessor.hasNext) accessor.extractTo(row, 0) assert(row(0) === randomRow(0)) + assert(accessor.hasNext) accessor.extractTo(row, 0) assert(row.isNullAt(0)) } + + assert(!accessor.hasNext) } } } From 036cd096e0d62350562c78e061f35b7f634d66b5 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 9 Apr 2014 18:59:06 +0800 Subject: [PATCH 04/20] Clean up unused imports --- .../test/scala/org/apache/spark/sql/CachedTableSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 7c6a642278226..0331f90272a99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,11 +17,10 @@ package org.apache.spark.sql -import org.scalatest.FunSuite import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.execution.SparkLogicalPlan import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.test.TestSQLContext class CachedTableSuite extends QueryTest { TestData // Load test tables. From 8426ddcb38a2513dce27b3e7da9af9518679b0ac Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 9 Apr 2014 19:01:58 +0800 Subject: [PATCH 05/20] Bug fix: InMemoryColumnarTableScan should cache columns specified by the attributes argument --- .../sql/columnar/InMemoryColumnarTableScan.scala | 8 +++++--- ...ySuite.scala => InMemoryColumnarQuerySuite.scala} | 12 +++++++++++- 2 files changed, 16 insertions(+), 4 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/columnar/{ColumnarQuerySuite.scala => InMemoryColumnarQuerySuite.scala} (79%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 8a24733047423..b6282d7457f9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -27,9 +27,11 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch override def output: Seq[Attribute] = attributes lazy val cachedColumnBuffers = { + val ordinals = attributes.map(a => child.output.indexWhere(_.name == a.name)) val output = child.output val cached = child.execute().mapPartitions { iterator => - val columnBuilders = output.map { attribute => + val columnBuilders = ordinals.map { i => + val attribute = output(i) ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name) }.toArray @@ -37,8 +39,8 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch while (iterator.hasNext) { row = iterator.next() var i = 0 - while (i < row.length) { - columnBuilders(i).appendFrom(row, i) + while (i < ordinals.length) { + columnBuilders(i).appendFrom(row, ordinals(i)) i += 1 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala similarity index 79% rename from sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 2ed4cf2170f9d..16a13b8a74960 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.columnar import org.apache.spark.sql.{QueryTest, TestData} +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.SparkLogicalPlan import org.apache.spark.sql.test.TestSQLContext -class ColumnarQuerySuite extends QueryTest { +class InMemoryColumnarQuerySuite extends QueryTest { import TestData._ import TestSQLContext._ @@ -32,6 +33,15 @@ class ColumnarQuerySuite extends QueryTest { checkAnswer(scan, testData.collect().toSeq) } + test("projection") { + val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().map { + case Row(key: Int, value: String) => value -> key + }.toSeq) + } + test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) From e619995291c1ab9bcbddbba1f5091b35bd42f49b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 9 Apr 2014 19:16:58 +0800 Subject: [PATCH 06/20] Bug fix: incorrect byte order in CompressionScheme.columnHeaderSize --- .../spark/sql/columnar/compression/CompressionScheme.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index c605a8e4434e3..ba1810dd2ae66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.columnar.compression -import java.nio.ByteBuffer +import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.types.NativeType import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType} @@ -84,7 +84,7 @@ private[sql] object CompressionScheme { } def columnHeaderSize(columnBuffer: ByteBuffer): Int = { - val header = columnBuffer.duplicate() + val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder) val nullCount = header.getInt(4) // Column type ID + null count + null positions 4 + 4 + 4 * nullCount From 9c8fc4055375284dd93d14edb69955e15597c561 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 10 Apr 2014 06:01:56 +0800 Subject: [PATCH 07/20] Disable compression by default --- .../sql/columnar/compression/CompressionScheme.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index ba1810dd2ae66..76a08e2e00b44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -57,8 +57,14 @@ private[sql] trait AllCompressionSchemes extends WithCompressionSchemes { } private[sql] object CompressionScheme { - val all: Seq[CompressionScheme] = + val compressionEnabled = + System.getProperty("spark.sql.inMemoryCompression.enabled", "false").toBoolean + + val all: Seq[CompressionScheme] = if (compressionEnabled) { Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta) + } else { + Seq(PassThrough) + } private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap From c9b0f6f878207ed9bd2a7dbd4f4f333c5c21c456 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 10 Apr 2014 06:03:44 +0800 Subject: [PATCH 08/20] Let InsertIntoTable support InMemoryColumnarTableScan --- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 3ca1d93c11fa9..ac817b21a152e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -42,6 +43,9 @@ trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil + case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( + _, HiveTableScan(_, table, _))), partition, child, overwrite) => + InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case _ => Nil } } From 6360723e684334da0e975f2e13aad8c36bd53c06 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 10 Apr 2014 07:14:27 +0800 Subject: [PATCH 09/20] Made PreInsertionCasts support SparkLogicalPlan and InMemoryColumnarTableScan --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index fc053c56c052d..c36b5878cb007 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { case p: LogicalPlan if !p.childrenResolved => p case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) => - val childOutputDataTypes = child.output.map(_.dataType) - // Only check attributes, not partitionKeys since they are always strings. - // TODO: Fully support inserting into partitioned tables. - val tableOutputDataTypes = table.attributes.map(_.dataType) - - if (childOutputDataTypes == tableOutputDataTypes) { - p - } else { - // Only do the casting when child output data types differ from table output data types. - val castedChildOutput = child.output.zip(table.output).map { - case (input, output) if input.dataType != output.dataType => - Alias(Cast(input, output.dataType), input.name)() - case (input, _) => input - } - - p.copy(child = logical.Project(castedChildOutput, child)) + castChildOutput(p, table, child) + + case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( + _, HiveTableScan(_, table, _))), _, child, _) => + castChildOutput(p, table, child) + } + + def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = { + val childOutputDataTypes = child.output.map(_.dataType) + // Only check attributes, not partitionKeys since they are always strings. + // TODO: Fully support inserting into partitioned tables. + val tableOutputDataTypes = table.attributes.map(_.dataType) + + if (childOutputDataTypes == tableOutputDataTypes) { + p + } else { + // Only do the casting when child output data types differ from table output data types. + val castedChildOutput = child.output.zip(table.output).map { + case (input, output) if input.dataType != output.dataType => + Alias(Cast(input, output.dataType), input.name)() + case (input, _) => input } + + p.copy(child = logical.Project(castedChildOutput, child)) + } } } From 2d0e168f6189cafa39a99f3388dff4fd1578974c Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 7 Apr 2014 23:01:50 -0700 Subject: [PATCH 10/20] Run Hive tests in-memory too. --- .../scala/org/apache/spark/sql/hive/TestHive.scala | 4 ++++ .../sql/hive/execution/HiveCompatibilitySuite.scala | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 2fea9702954d7..661a3f1576e2d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -257,6 +257,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { private val loadedTables = new collection.mutable.HashSet[String] + var cacheTables: Boolean = false def loadTestTable(name: String) { if (!(loadedTables contains name)) { // Marks the table as loaded first to prevent infite mutually recursive table loading. @@ -265,6 +266,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { val createCmds = testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name")) createCmds.foreach(_()) + + if (cacheTables) + cacheTable(name) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index f76e16bc1afc5..d11b119f40067 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -18,6 +18,17 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.hive.TestHive +import org.scalatest.BeforeAndAfter + +class HiveInMemoryCompatibilitySuite extends HiveCompatibilitySuite with BeforeAndAfter { + override def beforeAll() { + TestHive.cacheTables = true + } + + override def afterAll() { + TestHive.cacheTables = false + } +} /** * Runs the test cases that are included in the hive distribution. From e36cdd08e5d17cf0d5e850f33d81ca9e93ae1ddd Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 9 Apr 2014 19:05:54 -0700 Subject: [PATCH 11/20] Spelling. --- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 16da7fd92bffe..91500416eefaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -99,7 +99,7 @@ class SchemaRDD( def baseSchemaRDD = this // ========================================================================================= - // RDD functions: Copy the interal row representation so we present immutable data to users. + // RDD functions: Copy the internal row representation so we present immutable data to users. // ========================================================================================= override def compute(split: Partition, context: TaskContext): Iterator[Row] = From 1965123e81bc581216c70ce62af55167bab8b66d Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 9 Apr 2014 19:06:39 -0700 Subject: [PATCH 12/20] Don't use coalesce for gathering all data to a single partition, as it does not work correctly with mutable rows. --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 450c142c0baa4..070557e47c4c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => - child.execute().coalesce(1, shuffle = true) + val rdd = child.execute().mapPartitions { iter => + val mutablePair = new MutablePair[Null, Row]() + iter.map(r => mutablePair.update(null, r)) + } + val partitioner = new HashPartitioner(1) + val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner) + shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) + shuffled.map(_._2) case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. From ab9e8078ba699fa76244b946db49d63b8f2c720d Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 9 Apr 2014 19:07:34 -0700 Subject: [PATCH 13/20] Fix the logged console version of failed test cases to use the new syntax. --- .../spark/sql/hive/execution/HiveComparisonTest.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 3cc4562a88d66..9370c478bb589 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -218,10 +218,7 @@ abstract class HiveComparisonTest val quotes = "\"\"\"" queryList.zipWithIndex.map { case (query, i) => - s""" - |val q$i = $quotes$query$quotes.q - |q$i.stringResult() - """.stripMargin + s"""val q$i = hql($quotes$query$quotes); q$i.collect()""" }.mkString("\n== Console version of this test ==\n", "\n", "\n") } @@ -287,7 +284,6 @@ abstract class HiveComparisonTest |Error: ${e.getMessage} |${stackTraceToString(e)} |$queryString - |$consoleTestCase """.stripMargin stringToFile( new File(hiveFailedDirectory, testCaseName), @@ -313,8 +309,6 @@ abstract class HiveComparisonTest |$query |== HIVE - ${hive.size} row(s) == |${hive.mkString("\n")} - | - |$consoleTestCase """.stripMargin stringToFile(new File(failedDirectory, testCaseName), errorMessage + consoleTestCase) fail(errorMessage) From d1df4fde2e69c417236619b283c1292477c5e0a8 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 9 Apr 2014 19:12:40 -0700 Subject: [PATCH 14/20] Remove test tables that might always get created anyway? --- .../src/main/scala/org/apache/spark/sql/hive/TestHive.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 661a3f1576e2d..465e5f146fe71 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -160,12 +160,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { TestTable("src1", "CREATE TABLE src1 (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), - TestTable("dest1", - "CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd), - TestTable("dest2", - "CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd), - TestTable("dest3", - "CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd), TestTable("srcpart", () => { runSqlHive( "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") From 4390bcca3fa1ea4ff3874871a7fc23acb796334e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 11 Apr 2014 09:50:54 +0800 Subject: [PATCH 15/20] Report error for any Throwable in HiveComparisonTest --- .../apache/spark/sql/hive/execution/HiveComparisonTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 9370c478bb589..6c91f40d0f925 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -300,7 +300,7 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHive.HiveQLQueryExecution(queryString) try { (query, prepareAnswer(query, query.stringResult())) } catch { - case e: Exception => + case e: Throwable => val errorMessage = s""" |Failed to execute query using catalyst: From 99382bf49e7a26ff77eb451314d6a5d8e0f5d11c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 11 Apr 2014 09:51:52 +0800 Subject: [PATCH 16/20] Enable compression by default --- .../sql/columnar/compression/CompressionScheme.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index 76a08e2e00b44..ba1810dd2ae66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -57,14 +57,8 @@ private[sql] trait AllCompressionSchemes extends WithCompressionSchemes { } private[sql] object CompressionScheme { - val compressionEnabled = - System.getProperty("spark.sql.inMemoryCompression.enabled", "false").toBoolean - - val all: Seq[CompressionScheme] = if (compressionEnabled) { + val all: Seq[CompressionScheme] = Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta) - } else { - Seq(PassThrough) - } private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap From 32cc9ce7e91440bae3556853ffb0ff7a1f0e61ba Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 11 Apr 2014 09:52:17 +0800 Subject: [PATCH 17/20] Code style cleanup --- .../org/apache/spark/sql/hive/hiveUdfs.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index f9b437d435eba..55a4363af6c76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -130,8 +130,7 @@ trait HiveFunctionFactory { } } -abstract class HiveUdf - extends Expression with Logging with HiveFunctionFactory { +abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory { self: Product => type UDFType @@ -146,7 +145,7 @@ abstract class HiveUdf lazy val functionInfo = getFunctionInfo(name) lazy val function = createFunction[UDFType](name) - override def toString = s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})" + override def toString = s"$nodeName#${functionInfo.getDisplayName}(${children.mkString(",")})" } case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf { @@ -202,10 +201,11 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd } } -case class HiveGenericUdf( - name: String, - children: Seq[Expression]) extends HiveUdf with HiveInspectors { +case class HiveGenericUdf(name: String, children: Seq[Expression]) + extends HiveUdf with HiveInspectors { + import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ + type UDFType = GenericUDF @transient @@ -357,7 +357,7 @@ case class HiveGenericUdaf( override def toString = s"$nodeName#$name(${children.mkString(",")})" - def newInstance = new HiveUdafFunction(name, children, this) + def newInstance() = new HiveUdafFunction(name, children, this) } /** @@ -435,7 +435,7 @@ case class HiveGenericUdtf( } } - override def toString() = s"$nodeName#$name(${children.mkString(",")})" + override def toString = s"$nodeName#$name(${children.mkString(",")})" } case class HiveUdafFunction( From 882c538653c1a796f771ed2cbd53d1f9888b10fb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 11 Apr 2014 09:52:58 +0800 Subject: [PATCH 18/20] Remove attributes field from InMemoryColumnarTableScan --- .../scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- .../sql/columnar/InMemoryColumnarTableScan.scala | 14 ++++++-------- .../org/apache/spark/sql/execution/SparkPlan.scala | 3 +-- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 6 +++--- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- 6 files changed, 14 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d3d4c56bafe41..50ceace993d53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -121,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def cacheTable(tableName: String): Unit = { val currentTable = catalog.lookupRelation(None, tableName) val asInMemoryRelation = - InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan) + InMemoryColumnarTableScan(executePlan(currentTable).executedPlan) catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation)) } @@ -131,7 +131,7 @@ class SQLContext(@transient val sparkContext: SparkContext) EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { // This is kind of a hack to make sure that if this was just an RDD registered as a table, // we reregister the RDD as a table. - case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) => + case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(e: ExistingRdd)) => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) catalog.registerTable(None, tableName, SparkLogicalPlan(e)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index b6282d7457f9e..c4fc4239d5bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -21,17 +21,15 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} import org.apache.spark.sql.execution.{SparkPlan, LeafNode} import org.apache.spark.sql.Row -private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) +private[sql] case class InMemoryColumnarTableScan(child: SparkPlan) extends LeafNode { - override def output: Seq[Attribute] = attributes + override def output: Seq[Attribute] = child.output lazy val cachedColumnBuffers = { - val ordinals = attributes.map(a => child.output.indexWhere(_.name == a.name)) - val output = child.output + val childOutput = child.output val cached = child.execute().mapPartitions { iterator => - val columnBuilders = ordinals.map { i => - val attribute = output(i) + val columnBuilders = childOutput.map { attribute => ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name) }.toArray @@ -39,8 +37,8 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch while (iterator.hasNext) { row = iterator.next() var i = 0 - while (i < ordinals.length) { - columnBuilders(i).appendFrom(row, ordinals(i)) + while (i < childOutput.length) { + columnBuilders(i).appendFrom(row, i) i += 1 } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index daa423cb8ea1a..8ea7db90fae60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -70,8 +70,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) - case InMemoryColumnarTableScan(output, child) => - InMemoryColumnarTableScan(output.map(_.newInstance), child) + case scan @ InMemoryColumnarTableScan(child) => scan case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 16a13b8a74960..180b9276ee29f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("simple columnar query") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan)) checkAnswer(scan, testData.collect().toSeq) } test("projection") { val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan)) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan)) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c36b5878cb007..e015fd65c108c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -120,7 +120,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { castChildOutput(p, table, child) case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( - _, HiveTableScan(_, table, _))), _, child, _) => + HiveTableScan(_, table, _))), _, child, _) => castChildOutput(p, table, child) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ac817b21a152e..54a2052e46a6f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -44,7 +44,7 @@ trait HiveStrategies { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( - _, HiveTableScan(_, table, _))), partition, child, overwrite) => + HiveTableScan(_, table, _))), partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case _ => Nil } From 5bdbfe7170e0bfbb09a7b43aec04dc4a1ee866f2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 11 Apr 2014 12:35:17 +0800 Subject: [PATCH 19/20] Revert 882c538 & 8426ddc, which introduced regression --- .../main/scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- .../spark/sql/columnar/InMemoryColumnarTableScan.scala | 10 +++++----- .../org/apache/spark/sql/execution/SparkPlan.scala | 3 ++- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 6 +++--- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 50ceace993d53..d3d4c56bafe41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -121,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def cacheTable(tableName: String): Unit = { val currentTable = catalog.lookupRelation(None, tableName) val asInMemoryRelation = - InMemoryColumnarTableScan(executePlan(currentTable).executedPlan) + InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan) catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation)) } @@ -131,7 +131,7 @@ class SQLContext(@transient val sparkContext: SparkContext) EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { // This is kind of a hack to make sure that if this was just an RDD registered as a table, // we reregister the RDD as a table. - case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(e: ExistingRdd)) => + case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) catalog.registerTable(None, tableName, SparkLogicalPlan(e)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index c4fc4239d5bb7..8a24733047423 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -21,15 +21,15 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} import org.apache.spark.sql.execution.{SparkPlan, LeafNode} import org.apache.spark.sql.Row -private[sql] case class InMemoryColumnarTableScan(child: SparkPlan) +private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) extends LeafNode { - override def output: Seq[Attribute] = child.output + override def output: Seq[Attribute] = attributes lazy val cachedColumnBuffers = { - val childOutput = child.output + val output = child.output val cached = child.execute().mapPartitions { iterator => - val columnBuilders = childOutput.map { attribute => + val columnBuilders = output.map { attribute => ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name) }.toArray @@ -37,7 +37,7 @@ private[sql] case class InMemoryColumnarTableScan(child: SparkPlan) while (iterator.hasNext) { row = iterator.next() var i = 0 - while (i < childOutput.length) { + while (i < row.length) { columnBuilders(i).appendFrom(row, i) i += 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 8ea7db90fae60..5d89697db5f99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -70,7 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) - case scan @ InMemoryColumnarTableScan(child) => scan + case scan @ InMemoryColumnarTableScan(output, child) => + scan.copy(attributes = output.map(_.newInstance)) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 180b9276ee29f..16a13b8a74960 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("simple columnar query") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) checkAnswer(scan, testData.collect().toSeq) } test("projection") { val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e015fd65c108c..c36b5878cb007 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -120,7 +120,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { castChildOutput(p, table, child) case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( - HiveTableScan(_, table, _))), _, child, _) => + _, HiveTableScan(_, table, _))), _, child, _) => castChildOutput(p, table, child) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 54a2052e46a6f..ac817b21a152e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -44,7 +44,7 @@ trait HiveStrategies { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( - HiveTableScan(_, table, _))), partition, child, overwrite) => + _, HiveTableScan(_, table, _))), partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case _ => Nil } From 6ad6d9bafac3ebb69086558b4783aed5f3590b50 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 12 Apr 2014 02:23:44 +0800 Subject: [PATCH 20/20] Merged HiveCompatibilitySuite and HiveInMemoryCompatibilitySuite --- .../execution/HiveCompatibilitySuite.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index d11b119f40067..c3cfa3d25a5c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -17,27 +17,26 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.hive.TestHive import org.scalatest.BeforeAndAfter -class HiveInMemoryCompatibilitySuite extends HiveCompatibilitySuite with BeforeAndAfter { - override def beforeAll() { - TestHive.cacheTables = true - } - - override def afterAll() { - TestHive.cacheTables = false - } -} +import org.apache.spark.sql.hive.TestHive /** * Runs the test cases that are included in the hive distribution. */ -class HiveCompatibilitySuite extends HiveQueryFileTest { +class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // TODO: bundle in jar files... get from classpath lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive") def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) + override def beforeAll() { + TestHive.cacheTables = true + } + + override def afterAll() { + TestHive.cacheTables = false + } + /** A list of tests deemed out of scope currently and thus completely disregarded. */ override def blackList = Seq( // These tests use hooks that are not on the classpath and thus break all subsequent execution.