From 14fb29d1cc8b30681026ad29f7fc674695644a62 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 24 Jan 2016 15:56:30 -0800 Subject: [PATCH 1/7] remove unnecessary columns from blockBy --- .../apache/spark/sql/DataFrameWriter.scala | 30 +++++++++- .../sql/hive/MetastoreDataSourcesSuite.scala | 55 ++++++++++++++++++- .../sql/sources/BucketedWriteSuite.scala | 17 ++++++ 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ab63fe4aa88b..c565da101ced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,6 +21,7 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -37,7 +38,7 @@ import org.apache.spark.sql.sources.HadoopFsRelation * @since 1.4.0 */ @Experimental -final class DataFrameWriter private[sql](df: DataFrame) { +final class DataFrameWriter private[sql](df: DataFrame) extends Logging { /** * Specifies the behavior when data or table already exists. Options include: @@ -232,6 +233,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { } private def getBucketSpec: Option[BucketSpec] = { + + if (sortColumnNames.isDefined) { require(numBuckets.isDefined, "sortBy must be used together with bucketBy") } @@ -240,7 +243,30 @@ final class DataFrameWriter private[sql](df: DataFrame) { n <- numBuckets } yield { require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.") - BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) + + if (normalizedParCols.isEmpty) { + BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) + } else { + // When partitionBy and blockBy are used at the same time, the overlapping columns are + // useless. Thus, we removed these overlapping columns from blockBy. + val bucketColumns: Seq[String] = + normalizedBucketColNames.get.filterNot(normalizedParCols.get.contains) + + if (bucketColumns.nonEmpty) { + if (bucketColumns.length != normalizedBucketColNames.get.length) { + val removedColumns: Seq[String] = + normalizedBucketColNames.get.filter(normalizedParCols.get.contains) + logInfo(s"BucketBy columns is changed to '${bucketColumnNames.mkString(", ")}' after " + + s"removing the columns '${removedColumns.mkString(", ")}' that are part of " + + s"PartitionBy columns '${partitioningColumns.mkString(", ")}'") + } + BucketSpec(n, bucketColumns, normalizedSortColNames.getOrElse(Nil)) + } else { + throw new AnalysisException(s"BucketBy columns (${bucketColumnNames.mkString(", ")}) " + + "should not be the subset of PartitionBy columns " + + s"'${partitioningColumns.mkString(", ")}'") + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 253f13c59852..1311e1f91ad5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -745,7 +745,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("Saving partition columns information") { + test("Saving partitionBy columns information") { val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d") val tableName = s"partitionInfo_${System.currentTimeMillis()}" @@ -776,6 +776,59 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("Saving information for sortBy and bucketBy columns") { + val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d") + val tableName = s"partitionInfo_${System.currentTimeMillis()}" + + withTable(tableName) { + df.write + .format("parquet") + .bucketBy(8, "d", "b") + .sortBy("c") + .saveAsTable(tableName) + invalidateTable(tableName) + val metastoreTable = catalog.client.getTable("default", tableName) + val expectedBlockByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) + val expectedSortByColumns = StructType(df.schema("c") :: Nil) + + val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt + assert(numBuckets == 8) + + val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt + assert(numBucketCols == 2) + + val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt + assert(numSortCols == 1) + + val actualBlockByColumns = + StructType( + (0 until numBucketCols).map { index => + df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index")) + }) + // Make sure blockBy columns are correctly stored in metastore. + assert( + expectedBlockByColumns.sameType(actualBlockByColumns), + s"Partitions columns stored in metastore $actualBlockByColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedBlockByColumns.") + + val actualSortByColumns = + StructType( + (0 until numSortCols).map { index => + df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index")) + }) + // Make sure sortBy columns are correctly stored in metastore. + assert( + expectedSortByColumns.sameType(actualSortByColumns), + s"Partitions columns stored in metastore $actualSortByColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedSortByColumns.") + + // Check the content of the saved table. + checkAnswer( + table(tableName).select("c", "b", "d", "a"), + df.select("c", "b", "d", "a")) + } + } + test("insert into a table") { def createDF(from: Int, to: Int): DataFrame = { (from to to).map(i => i -> s"str$i").toDF("c1", "c2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 59b74d2b4c5e..bb0bc80f0bdb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -158,6 +158,23 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } } + test("write bucketed data with the overlapping blockBy and partitionBy columns") { + for (source <- Seq("parquet", "json", "orc")) { + withTable("bucketed_table") { + df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "i", "k") + .sortBy("k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("k"), Seq("k")) + } + } + } + } + test("write bucketed data without partitionBy") { for (source <- Seq("parquet", "json", "orc")) { withTable("bucketed_table") { From e68351bccd7911f55cade845918ecc2494271d2f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 24 Jan 2016 16:26:48 -0800 Subject: [PATCH 2/7] added more test cases. --- .../apache/spark/sql/DataFrameWriter.scala | 12 +++++------ .../sql/sources/BucketedWriteSuite.scala | 21 +++++++++++++++++-- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c565da101ced..c3eb455ac368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -256,15 +256,15 @@ final class DataFrameWriter private[sql](df: DataFrame) extends Logging { if (bucketColumns.length != normalizedBucketColNames.get.length) { val removedColumns: Seq[String] = normalizedBucketColNames.get.filter(normalizedParCols.get.contains) - logInfo(s"BucketBy columns is changed to '${bucketColumnNames.mkString(", ")}' after " + - s"removing the columns '${removedColumns.mkString(", ")}' that are part of " + - s"PartitionBy columns '${partitioningColumns.mkString(", ")}'") + logInfo(s"bucketBy columns is changed to '${bucketColumnNames.get.mkString(", ")}' " + + s"after removing the columns '${removedColumns.mkString(", ")}' that are part of " + + s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") } BucketSpec(n, bucketColumns, normalizedSortColNames.getOrElse(Nil)) } else { - throw new AnalysisException(s"BucketBy columns (${bucketColumnNames.mkString(", ")}) " + - "should not be the subset of PartitionBy columns " + - s"'${partitioningColumns.mkString(", ")}'") + throw new AnalysisException( + s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be the " + + s"subset of partitionBy columns '${partitioningColumns.get.mkString(", ")}'") } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index bb0bc80f0bdb..4061456195f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,10 +92,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle fail(s"Unable to find the related bucket files.") } + // Remove the duplicate columns in bucketCols and sortCols; + // Otherwise, we got analysis errors due to duplicate names + val selectedColumns = (bucketCols ++ sortCols).distinct // We may lose the type information after write(e.g. json format doesn't keep schema // information), here we get the types from the original dataframe. - val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType) - val columns = (bucketCols ++ sortCols).zip(types).map { + val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType) + val columns = selectedColumns.zip(types).map { case (colName, dt) => col(colName).cast(dt) } @@ -169,12 +172,26 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle .saveAsTable("bucketed_table") for (i <- 0 until 5) { + // After column pruning, the actual bucketBy columns only contain `k`, which + // is identical to the sortBy column. testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("k"), Seq("k")) } } } } + test("write bucketed data with the identical blockBy and partitionBy columns") { + for (source <- Seq("parquet", "json", "orc")) { + withTable("bucketed_table") { + intercept[AnalysisException](df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "i") + .saveAsTable("bucketed_table")) + } + } + } + test("write bucketed data without partitionBy") { for (source <- Seq("parquet", "json", "orc")) { withTable("bucketed_table") { From 8c718b30c228074c0cc81e5fa6c8243aaf976a54 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 24 Jan 2016 18:08:54 -0800 Subject: [PATCH 3/7] style fix. --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c3eb455ac368..91ff736cca1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -233,8 +233,6 @@ final class DataFrameWriter private[sql](df: DataFrame) extends Logging { } private def getBucketSpec: Option[BucketSpec] = { - - if (sortColumnNames.isDefined) { require(numBuckets.isDefined, "sortBy must be used together with bucketBy") } From d207813497e3aa4002ef04b8eb62c200b01bed09 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 24 Jan 2016 22:59:26 -0800 Subject: [PATCH 4/7] address comments. --- .../apache/spark/sql/DataFrameWriter.scala | 28 ++++----------- .../sql/sources/BucketedWriteSuite.scala | 34 +++++-------------- 2 files changed, 16 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 91ff736cca1c..2b3fde36225c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -242,29 +242,15 @@ final class DataFrameWriter private[sql](df: DataFrame) extends Logging { } yield { require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.") - if (normalizedParCols.isEmpty) { - BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) - } else { - // When partitionBy and blockBy are used at the same time, the overlapping columns are - // useless. Thus, we removed these overlapping columns from blockBy. - val bucketColumns: Seq[String] = - normalizedBucketColNames.get.filterNot(normalizedParCols.get.contains) - - if (bucketColumns.nonEmpty) { - if (bucketColumns.length != normalizedBucketColNames.get.length) { - val removedColumns: Seq[String] = - normalizedBucketColNames.get.filter(normalizedParCols.get.contains) - logInfo(s"bucketBy columns is changed to '${bucketColumnNames.get.mkString(", ")}' " + - s"after removing the columns '${removedColumns.mkString(", ")}' that are part of " + - s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") - } - BucketSpec(n, bucketColumns, normalizedSortColNames.getOrElse(Nil)) - } else { + // partitionBy columns cannot be used in blockedBy + if (normalizedParCols.nonEmpty && + normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) { throw new AnalysisException( - s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be the " + - s"subset of partitionBy columns '${partitioningColumns.get.mkString(", ")}'") - } + s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' should not be part of " + + s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'") } + + BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 4061456195f2..40a74e67a67b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -162,34 +162,18 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data with the overlapping blockBy and partitionBy columns") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "i", "k") - .sortBy("k") - .saveAsTable("bucketed_table") - - for (i <- 0 until 5) { - // After column pruning, the actual bucketBy columns only contain `k`, which - // is identical to the sortBy column. - testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("k"), Seq("k")) - } - } - } + intercept[AnalysisException](df.write + .partitionBy("i") + .bucketBy(8, "i", "k") + .sortBy("k") + .saveAsTable("bucketed_table")) } test("write bucketed data with the identical blockBy and partitionBy columns") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - intercept[AnalysisException](df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "i") - .saveAsTable("bucketed_table")) - } - } + intercept[AnalysisException](df.write + .partitionBy("i") + .bucketBy(8, "i") + .saveAsTable("bucketed_table")) } test("write bucketed data without partitionBy") { From 64f5ea0d60dc22e0742ef3bb4b48f6669cdd1a5c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 25 Jan 2016 04:20:11 -0800 Subject: [PATCH 5/7] typo fix. --- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2b3fde36225c..e4ce4cf5fed3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -242,7 +242,7 @@ final class DataFrameWriter private[sql](df: DataFrame) extends Logging { } yield { require(n > 0 && n < 100000, "Bucket number must be greater than 0 and less than 100000.") - // partitionBy columns cannot be used in blockedBy + // partitionBy columns cannot be used in bucketBy if (normalizedParCols.nonEmpty && normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty) { throw new AnalysisException( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 1311e1f91ad5..91320840faa6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -788,7 +788,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .saveAsTable(tableName) invalidateTable(tableName) val metastoreTable = catalog.client.getTable("default", tableName) - val expectedBlockByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) + val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt @@ -800,16 +800,16 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt assert(numSortCols == 1) - val actualBlockByColumns = + val actualBucketByColumns = StructType( (0 until numBucketCols).map { index => df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index")) }) - // Make sure blockBy columns are correctly stored in metastore. + // Make sure bucketBy columns are correctly stored in metastore. assert( - expectedBlockByColumns.sameType(actualBlockByColumns), - s"Partitions columns stored in metastore $actualBlockByColumns is not the " + - s"partition columns defined by the saveAsTable operation $expectedBlockByColumns.") + expectedBucketByColumns.sameType(actualBucketByColumns), + s"Partitions columns stored in metastore $actualBucketByColumns is not the " + + s"partition columns defined by the saveAsTable operation $expectedBucketByColumns.") val actualSortByColumns = StructType( From f9a8bdf99b213c6bb0c2d5ed2108a7c31fd430b5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 25 Jan 2016 04:21:25 -0800 Subject: [PATCH 6/7] typo fix. --- .../org/apache/spark/sql/sources/BucketedWriteSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 40a74e67a67b..03c6a7ac731c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -161,7 +161,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } } - test("write bucketed data with the overlapping blockBy and partitionBy columns") { + test("write bucketed data with the overlapping bucketBy and partitionBy columns") { intercept[AnalysisException](df.write .partitionBy("i") .bucketBy(8, "i", "k") @@ -169,7 +169,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle .saveAsTable("bucketed_table")) } - test("write bucketed data with the identical blockBy and partitionBy columns") { + test("write bucketed data with the identical bucketBy and partitionBy columns") { intercept[AnalysisException](df.write .partitionBy("i") .bucketBy(8, "i") From 2ace09fde9bba59dfcbc5efd6a13f031b4f9eb15 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 25 Jan 2016 10:54:14 -0800 Subject: [PATCH 7/7] address comments. --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../org/apache/spark/sql/sources/BucketedWriteSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e4ce4cf5fed3..12eb2393634a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,7 +21,6 @@ import java.util.Properties import scala.collection.JavaConverters._ -import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -38,7 +37,7 @@ import org.apache.spark.sql.sources.HadoopFsRelation * @since 1.4.0 */ @Experimental -final class DataFrameWriter private[sql](df: DataFrame) extends Logging { +final class DataFrameWriter private[sql](df: DataFrame) { /** * Specifies the behavior when data or table already exists. Options include: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 91320840faa6..211932fea00e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -778,7 +778,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("Saving information for sortBy and bucketBy columns") { val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d") - val tableName = s"partitionInfo_${System.currentTimeMillis()}" + val tableName = s"bucketingInfo_${System.currentTimeMillis()}" withTable(tableName) { df.write diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 03c6a7ac731c..a32f8fb4c5a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -163,8 +163,8 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle test("write bucketed data with the overlapping bucketBy and partitionBy columns") { intercept[AnalysisException](df.write - .partitionBy("i") - .bucketBy(8, "i", "k") + .partitionBy("i", "j") + .bucketBy(8, "j", "k") .sortBy("k") .saveAsTable("bucketed_table")) }