From 3acf7f1efd694c7e3f40a5c5ce9757fd611768bd Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 15 Mar 2017 01:14:49 +0900 Subject: [PATCH 01/13] initial commit --- .../sql/execution/datasources/DataSource.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 9fce29b06b9d..037478ee9d7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -374,7 +374,7 @@ case class DataSource( HadoopFsRelation( fileCatalog, partitionSchema = partitionSchema, - dataSchema = dataSchema.asNullable, + dataSchema = dataSchema, bucketSpec = bucketSpec, format, caseInsensitiveOptions)(sparkSession) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 9ea9951c24ef..f0188afa64ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -737,6 +737,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } + test("loadWithSchema") { + withTempDir { dir => + val field = "id" + val df = spark.range(0, 5, 1, 1).toDF(field) + val result = Array(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L)) + + Seq("parquet", "json").foreach { fmt => + val path = new File(dir, fmt).getCanonicalPath + + val schema = StructType(Seq(StructField(field, LongType, false))) + df.write.format(fmt).mode("overwrite").save(path) + val dfRead = spark.read.format(fmt).schema(schema).load(path) + assert(dfRead.collect === result) + assert(dfRead.schema.equals(schema)) + } + } + } + ignore("show") { // This test case is intended ignored, but to make sure it compiles correctly testData.select($"*").show() From 2851ccf874a2d23820ec93376a429016545d2969 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 15 Mar 2017 04:38:28 +0900 Subject: [PATCH 02/13] fix test failure --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4dd077715d0..41af50744b95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -404,7 +404,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { pathToNonPartitionedTable, userSpecifiedSchema = None, userSpecifiedPartitionCols = partitionCols, - expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedSchema = new StructType().add("num", IntegerType, false).add("str", StringType), expectedPartitionCols = Seq.empty[String]) } } From 95aea521dbdc024ba4762c5ec20a45192ef9bad7 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 20 Mar 2017 13:14:46 +0900 Subject: [PATCH 03/13] add nullability check in loading Parquet data --- .../parquet/VectorizedParquetRecordReader.java | 12 ++++++++++++ .../sql/execution/datasources/DataSource.scala | 3 ++- .../parquet/ParquetRowConverter.scala | 5 +++++ .../org/apache/spark/sql/DataFrameSuite.scala | 18 ------------------ 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 51bdf0f0f229..4b242ec3febf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -90,6 +90,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private ColumnarBatch columnarBatch; + /** + * Schema corresponds to columnarBatch + */ + private StructType columnarBatchSchema; + /** * If true, this class returns batches instead of rows. */ @@ -178,6 +183,7 @@ public void initBatch(MemoryMode memMode, StructType partitionColumns, } } + columnarBatchSchema = batchSchema; columnarBatch = ColumnarBatch.allocate(batchSchema, memMode); if (partitionColumns != null) { int partitionIdx = sparkSchema.fields().length; @@ -228,6 +234,12 @@ public boolean nextBatch() throws IOException { for (int i = 0; i < columnReaders.length; ++i) { if (columnReaders[i] == null) continue; columnReaders[i].readBatch(num, columnarBatch.column(i)); + StructField schema = columnarBatchSchema.fields()[i]; + if (columnarBatch.column(i).anyNullsSet() && !schema.nullable()) { + throw new UnsupportedOperationException( + "Should not contain null for non-nullable " + schema.dataType() + + " schema at column index " + i); + } } rowsReturned += num; columnarBatch.setNumRows(num); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 037478ee9d7d..842929f24000 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -374,7 +374,8 @@ case class DataSource( HadoopFsRelation( fileCatalog, partitionSchema = partitionSchema, - dataSchema = dataSchema, + dataSchema = + if (format.isInstanceOf[ParquetFileFormat]) dataSchema else dataSchema.asNullable, bucketSpec = bucketSpec, format, caseInsensitiveOptions)(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 32e6c60cd976..504984fc2a9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -190,6 +190,11 @@ private[parquet] class ParquetRowConverter( var i = 0 while (i < currentRow.numFields) { fieldConverters(i).updater.end() + if (currentRow.isNullAt(i) && !catalystType(i).nullable) { + throw new UnsupportedOperationException( + "Should not contain null for non-nullable " + catalystType(i).dataType + + " schema at column index " + i) + } i += 1 } updater.set(currentRow) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f0188afa64ac..9ea9951c24ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -737,24 +737,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("loadWithSchema") { - withTempDir { dir => - val field = "id" - val df = spark.range(0, 5, 1, 1).toDF(field) - val result = Array(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L)) - - Seq("parquet", "json").foreach { fmt => - val path = new File(dir, fmt).getCanonicalPath - - val schema = StructType(Seq(StructField(field, LongType, false))) - df.write.format(fmt).mode("overwrite").save(path) - val dfRead = spark.read.format(fmt).schema(schema).load(path) - assert(dfRead.collect === result) - assert(dfRead.schema.equals(schema)) - } - } - } - ignore("show") { // This test case is intended ignored, but to make sure it compiles correctly testData.select($"*").show() From e3b76fe3925e5c79ce323fd7e71115592aaf81d6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 20 Mar 2017 19:04:33 +0900 Subject: [PATCH 04/13] update StructType.merge with updating nullable --- .../scala/org/apache/spark/sql/types/StructType.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 54006e20a3eb..4c4e99a7f2ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -370,8 +370,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru * * 1. If A and B have the same name and data type, they are merged to a field C with the same name * and data type. C is nullable if and only if either A or B is nullable. - * 2. If A doesn't exist in `that`, it's included in the result schema. - * 3. If B doesn't exist in `this`, it's also included in the result schema. + * 2. If A doesn't exist in `that`, it's included in the result schema with nullable. + * 3. If B doesn't exist in `this`, it's also included in the result schema with nullable. * 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be * thrown. */ @@ -473,7 +473,7 @@ object StructType extends AbstractDataType { nullable = leftNullable || rightNullable) } .orElse { - Some(leftField) + Some(leftField.copy(nullable = true)) } .foreach(newFields += _) } @@ -482,7 +482,7 @@ object StructType extends AbstractDataType { rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach { f => - newFields += f + newFields += f.copy(nullable = true) } StructType(newFields) From 134a9be4c95a0ed7a796248b7c3edd0848a15237 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 20 Mar 2017 19:05:14 +0900 Subject: [PATCH 05/13] fix test failures by updating nullable --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 3 ++- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index d02c8ffe33f0..eee9c1dee174 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -159,7 +159,8 @@ class PlannerSuite extends SharedSQLContext { withTempView("testPushed") { val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan - assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]")) + print(s"${exp.toString}\n") + assert(exp.toString.contains("PushedFilters: EqualTo(key,15)]")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 94a2f9a00b3f..fcfa2cecc209 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -458,8 +458,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { readParquetFile(path.toString) { df => assertResult(df.schema) { StructType( - StructField("a", BooleanType, nullable = true) :: - StructField("b", IntegerType, nullable = true) :: + StructField("a", BooleanType, nullable = false) :: + StructField("b", IntegerType, nullable = false) :: Nil) } } From 1fe28951c922dc34c4db6cd4716eabf239aec675 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 20 Mar 2017 22:12:51 +0900 Subject: [PATCH 06/13] fix test failure --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index eee9c1dee174..69f2aab33eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -159,8 +159,7 @@ class PlannerSuite extends SharedSQLContext { withTempView("testPushed") { val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan - print(s"${exp.toString}\n") - assert(exp.toString.contains("PushedFilters: EqualTo(key,15)]")) + assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]")) } } } From 8a77c00f6002c148d6837f98ff879d9b220b35bf Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Mar 2017 15:28:40 +0900 Subject: [PATCH 07/13] add test suite --- .../sql/test/DataFrameReaderWriterSuite.scala | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 306aecb5bbc8..2b0d019371a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -21,12 +21,14 @@ import java.io.File import java.util.Locale import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.commons.lang3.exception.ExceptionUtils import org.scalatest.BeforeAndAfter import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -123,6 +125,7 @@ class MessageCapturingCommitProtocol(jobId: String, path: String) } } +case class PointStr(x: String, y: String) class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ @@ -680,6 +683,63 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + private def readAndWriteWithSchema(schema: StructType, + df: DataFrame, result: Array[Row], dfNull: DataFrame): Unit = { + val fmt = "parquet" + withTempDir { dir => + val path = new File(dir, "nonnull").getCanonicalPath + df.write.format(fmt).save(path) + val dfRead = spark.read.format(fmt).schema(schema).load(path) + checkAnswer(dfRead, result) + assert(dfRead.schema.equals(schema)) + + val pathNull = new File(dir, "null").getCanonicalPath + dfNull.write.format(fmt).save(pathNull) + val e = intercept[Exception] { + spark.read.format(fmt).schema(schema).load(pathNull).collect + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[UnsupportedOperationException] && + e.getMessage.contains("Should not contain null for non-nullable")) + } + } + + test("SPARK-19950: loadWithSchema") { + Seq("true", "false").foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + val dataInt = Seq(1, 2, 3) + val dfInt = sparkContext.parallelize(dataInt, 1).toDF("v") + val resultInt = dataInt.map(e => Row(e)).toArray + val schemaInt = StructType(Seq(StructField("v", IntegerType, false))) + val dfIntNull = sparkContext.parallelize(Seq[java.lang.Integer](1, null, 3), 1).toDF("v") + readAndWriteWithSchema(schemaInt, dfInt, resultInt, dfIntNull) + + val dataDouble = Seq(1.1D, 2.2D, 3.3D) + val dfDouble = sparkContext.parallelize(dataDouble, 1).toDF("v") + val resultDouble = dataDouble.map(e => Row(e)).toArray + val schemaDouble = StructType(Seq(StructField("v", DoubleType, false))) + val dfDoubleNull = sparkContext.parallelize(Seq[java.lang.Double](1.1D, null, 3.3D), 1) + .toDF("v") + readAndWriteWithSchema(schemaDouble, dfDouble, resultDouble, dfDoubleNull) + + val dataString = Seq("a", "b", "cd") + val dfString = sparkContext.parallelize(dataString, 1).toDF("v") + val resultString = dataString.map(e => Row(e)).toArray + val schemaString = StructType(Seq(StructField("v", StringType, false))) + val dfStringNull = sparkContext.parallelize(Seq("a", null, "cd"), 1).toDF("v") + readAndWriteWithSchema(schemaString, dfString, resultString, dfStringNull) + + val dataCaseClass = Seq(PointStr("a", "b"), PointStr("c", "d")) + val dfCaseClass = sparkContext.parallelize(dataCaseClass, 1).toDF + val resultCaseClass = dataCaseClass.map(e => Row(e.x, e.y)).toArray + val schemaCaseClass = StructType( + Seq(StructField("x", StringType, false), StructField("y", StringType, false))) + val dfCaseClassNull = sparkContext.parallelize( + Seq(PointStr("a", "b"), PointStr("c", null)), 1).toDF + readAndWriteWithSchema(schemaCaseClass, dfCaseClass, resultCaseClass, dfCaseClassNull) + } + } + } + test("SPARK-20431: Specify a schema by using a DDL-formatted string") { spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir) testRead(spark.read.schema(userSchemaString).text(), Seq.empty, userSchema) From 7b463e9046313491ab2635ff609336803c3a091c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Mar 2017 15:49:06 +0900 Subject: [PATCH 08/13] fix scala style error --- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 2b0d019371a8..e038dd39c61d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -21,11 +21,12 @@ import java.io.File import java.util.Locale import java.util.concurrent.ConcurrentLinkedQueue -import org.apache.commons.lang3.exception.ExceptionUtils import org.scalatest.BeforeAndAfter import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.commons.lang3.exception.ExceptionUtils + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf From 84e4859c65f068b3950c0db5ca10eb7710c5364c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Mar 2017 16:27:12 +0900 Subject: [PATCH 09/13] fix scala style error --- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e038dd39c61d..1ceb5c04bd4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.commons.lang3.exception.ExceptionUtils +import org.scalatest.BeforeAndAfter + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf From 0ae7d29ea4186892e812a402904772db0d902456 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 21 Mar 2017 16:40:10 +0900 Subject: [PATCH 10/13] fix scala style error --- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 1ceb5c04bd4a..6b4ed0eda5d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -26,7 +26,6 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.commons.lang3.exception.ExceptionUtils - import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ From af2521573d647308aaa222df55503253be61674b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 30 May 2017 18:32:35 +0900 Subject: [PATCH 11/13] rebase master --- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 6b4ed0eda5d6..e038dd39c61d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -26,7 +26,6 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.commons.lang3.exception.ExceptionUtils -import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier From 9a24e00f4de72d093830780fb13bfe8821bc4442 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 30 May 2017 18:53:16 +0900 Subject: [PATCH 12/13] fix scala style error --- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e038dd39c61d..2b0d019371a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -21,12 +21,11 @@ import java.io.File import java.util.Locale import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.commons.lang3.exception.ExceptionUtils import org.scalatest.BeforeAndAfter import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol -import org.apache.commons.lang3.exception.ExceptionUtils - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf From 6837eb3a5438caba1d0319949fe7b0bcecef7a43 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 31 May 2017 02:30:00 +0900 Subject: [PATCH 13/13] fix test failure --- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c785aca98582..614643231723 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1374,7 +1374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv checkAnswer(spark.table("old"), Row(1, "a")) val expectedSchema = StructType(Seq( - StructField("i", IntegerType, nullable = true), + StructField("i", IntegerType, nullable = false), StructField("j", StringType, nullable = true))) assert(table("old").schema === expectedSchema) }