From b5aea7f3824b5b76723f4616a264e3c41a6acdc1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 25 Mar 2019 17:52:05 +0800 Subject: [PATCH 1/7] refactor --- .../execution/datasources/v2/FileScan.scala | 26 +----------- .../datasources/v2/FileScanBuilder.scala | 4 +- .../execution/datasources/v2/FileTable.scala | 40 +++++++++++++++---- .../datasources/v2/FileWriteBuilder.scala | 22 +++------- .../datasources/v2/csv/CSVDataSourceV2.scala | 10 ----- .../datasources/v2/csv/CSVScan.scala | 6 --- .../datasources/v2/csv/CSVTable.scala | 14 ++++++- .../datasources/v2/csv/CSVWriteBuilder.scala | 14 +++---- .../datasources/v2/orc/OrcDataSourceV2.scala | 16 -------- .../datasources/v2/orc/OrcScan.scala | 6 --- .../datasources/v2/orc/OrcScanBuilder.scala | 2 +- .../datasources/v2/orc/OrcTable.scala | 21 +++++++++- .../datasources/v2/orc/OrcWriteBuilder.scala | 14 +++---- 13 files changed, 85 insertions(+), 110 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 6ab5c4b269b9..7203918f3c68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -37,22 +37,6 @@ abstract class FileScan( false } - /** - * Returns whether this format supports the given [[DataType]] in write path. - * By default all data types are supported. - */ - def supportsDataType(dataType: DataType): Boolean = true - - /** - * The string that represents the format that this data source provider uses. This is - * overridden by children to provide a nice alias for the data source. For example: - * - * {{{ - * override def formatName(): String = "ORC" - * }}} - */ - def formatName: String - protected def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) @@ -76,13 +60,5 @@ abstract class FileScan( partitions.toArray } - override def toBatch: Batch = { - readSchema.foreach { field => - if (!supportsDataType(field.dataType)) { - throw new AnalysisException( - s"$formatName data source does not support ${field.dataType.catalogString} data type.") - } - } - this - } + override def toBatch: Batch = this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index d4e55a50307d..b63e025b76d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -16,8 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.types.{DataType, StructType} abstract class FileScanBuilder(schema: StructType) extends ScanBuilder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 4b35df355b6e..459af2295b33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.TableCapability._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.SchemaUtils @@ -46,17 +46,27 @@ abstract class FileTable( sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) } - lazy val dataSchema: StructType = userSpecifiedSchema.orElse { - inferSchema(fileIndex.allFiles()) - }.getOrElse { - throw new AnalysisException( - s"Unable to infer schema for $name. It must be specified manually.") - }.asNullable + lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => + val partitionSchema = fileIndex.partitionSchema + val equality = sparkSession.sessionState.conf.resolver + StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) + }.orElse { + inferSchema(fileIndex.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $name. It must be specified manually.") + }.asNullable override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames, "in the data schema", caseSensitive) + dataSchema.foreach { field => + if (!supportsDataType(field.dataType)) { + throw new AnalysisException( + s"$name data source does not support ${field.dataType.catalogString} data type.") + } + } val partitionSchema = fileIndex.partitionSchema SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames, "in the partition schema", caseSensitive) @@ -72,6 +82,22 @@ abstract class FileTable( * Spark will require that user specify the schema manually. */ def inferSchema(files: Seq[FileStatus]): Option[StructType] + + /** + * Returns whether this format supports the given [[DataType]] in write path. + * By default all data types are supported. + */ + def supportsDataType(dataType: DataType): Boolean = true + + /** + * The string that represents the format that this data source provider uses. This is + * overridden by children to provide a nice alias for the data source. For example: + * + * {{{ + * override def formatName(): String = "ORC" + * }}} + */ + def formatName: String } object FileTable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index bb4a428e4066..7ff5c4182d98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -39,7 +39,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.SerializableConfiguration -abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) +abstract class FileWriteBuilder( + options: CaseInsensitiveStringMap, + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean) extends WriteBuilder with SupportsSaveMode { private var schema: StructType = _ private var queryId: String = _ @@ -108,22 +112,6 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St options: Map[String, String], dataSchema: StructType): OutputWriterFactory - /** - * Returns whether this format supports the given [[DataType]] in write path. - * By default all data types are supported. - */ - def supportsDataType(dataType: DataType): Boolean = true - - /** - * The string that represents the format that this data source provider uses. This is - * overridden by children to provide a nice alias for the data source. For example: - * - * {{{ - * override def formatName(): String = "ORC" - * }}} - */ - def formatName: String - private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = { assert(schema != null, "Missing input data schema") assert(queryId != null, "Missing query ID") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala index 4ecd9cdc32ac..4d3726262568 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala @@ -41,13 +41,3 @@ class CSVDataSourceV2 extends FileDataSourceV2 { CSVTable(tableName, sparkSession, options, paths, Some(schema)) } } - -object CSVDataSourceV2 { - def supportsDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true - - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) - - case _ => false - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 35c6a668f22a..8f2f8f256731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -75,10 +75,4 @@ case class CSVScan( CSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, fileIndex.partitionSchema, readSchema, parsedOptions) } - - override def supportsDataType(dataType: DataType): Boolean = { - CSVDataSourceV2.supportsDataType(dataType) - } - - override def formatName: String = "CSV" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala index bf4b8ba868f2..852cbf07c350 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.sources.v2.writer.WriteBuilder -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{AtomicType, DataType, StructType, UserDefinedType} import org.apache.spark.sql.util.CaseInsensitiveStringMap case class CSVTable( @@ -48,5 +48,15 @@ case class CSVTable( } override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new CSVWriteBuilder(options, paths) + new CSVWriteBuilder(options, paths, formatName, supportsDataType) + + override def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + + case _ => false + } + + override def formatName: String = "CSV" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala index bb26d2f92d74..92b47e435480 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala @@ -27,8 +27,12 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class CSVWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) - extends FileWriteBuilder(options, paths) { +class CSVWriteBuilder( + options: CaseInsensitiveStringMap, + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean) + extends FileWriteBuilder(options, paths, formatName, supportsDataType) { override def prepareWrite( sqlConf: SQLConf, job: Job, @@ -56,10 +60,4 @@ class CSVWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) } } } - - override def supportsDataType(dataType: DataType): Boolean = { - CSVDataSourceV2.supportsDataType(dataType) - } - - override def formatName: String = "CSV" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index 36e7e12e41ce..5d3c6113b732 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -42,19 +42,3 @@ class OrcDataSourceV2 extends FileDataSourceV2 { } } -object OrcDataSourceV2 { - def supportsDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true - - case st: StructType => st.forall { f => supportsDataType(f.dataType) } - - case ArrayType(elementType, _) => supportsDataType(elementType) - - case MapType(keyType, valueType, _) => - supportsDataType(keyType) && supportsDataType(valueType) - - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) - - case _ => false - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 237eadb698b4..59f52daa8bf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -43,10 +43,4 @@ case class OrcScan( OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, fileIndex.partitionSchema, readSchema) } - - override def supportsDataType(dataType: DataType): Boolean = { - OrcDataSourceV2.supportsDataType(dataType) - } - - override def formatName: String = "ORC" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 8ac56aa5f64b..8014b32c997a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFilters import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader.{Scan, SupportsPushDownFilters} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap case class OrcScanBuilder( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index aac38fb3fa1f..ace77b7c4d9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.sources.v2.writer.WriteBuilder -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class OrcTable( @@ -40,5 +40,22 @@ case class OrcTable( OrcUtils.readSchema(sparkSession, files) override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new OrcWriteBuilder(options, paths) + new OrcWriteBuilder(options, paths, formatName, supportsDataType) + + override def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case st: StructType => st.forall { f => supportsDataType(f.dataType) } + + case ArrayType(elementType, _) => supportsDataType(elementType) + + case MapType(keyType, valueType, _) => + supportsDataType(keyType) && supportsDataType(valueType) + + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + + case _ => false + } + + override def formatName: String = "ORC" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala index 829ab5fbe176..f5b06e11c8bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala @@ -28,8 +28,12 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap -class OrcWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) - extends FileWriteBuilder(options, paths) { +class OrcWriteBuilder( + options: CaseInsensitiveStringMap, + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean) + extends FileWriteBuilder(options, paths, formatName, supportsDataType) { override def prepareWrite( sqlConf: SQLConf, @@ -65,10 +69,4 @@ class OrcWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) } } } - - override def supportsDataType(dataType: DataType): Boolean = { - OrcDataSourceV2.supportsDataType(dataType) - } - - override def formatName: String = "ORC" } From da5549376237b3ad4a168fd54830ce5adb93db8c Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 25 Mar 2019 22:36:48 +0800 Subject: [PATCH 2/7] add test case --- .../datasources/v2/FileTableSuite.scala | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala new file mode 100644 index 000000000000..dceaf7fb308b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.sources.v2.reader.ScanBuilder +import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class DummyFileTable( + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + expectedDataSchema: StructType, + userSpecifiedSchema: Option[StructType]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = Some(expectedDataSchema) + + override def name(): String = "Dummy" + + override def formatName: String = "Dummy" + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = null + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = null + + override def supportsDataType(dataType: DataType): Boolean = dataType == StringType +} + +class FileTableSuite extends QueryTest with SharedSQLContext with SQLTestUtils { + + test("Data type validation should check data schema only") { + withTempPath { dir => + val df = spark.createDataFrame(Seq(("a", 1), ("b", 2))).toDF("v", "p") + val pathName = dir.getCanonicalPath + df.write.partitionBy("p").text(pathName) + val options = new CaseInsensitiveStringMap(Map("path" -> pathName).asJava) + val expectedDataSchema = StructType(Seq(StructField("v", StringType, true))) + // DummyFileTable doesn't support Integer data type. + // However, the partition schema is handled by Spark, so it is allowed to contain + // Integer data type here. + val table = new DummyFileTable(spark, options, Seq(pathName), expectedDataSchema, None) + assert(table.dataSchema == expectedDataSchema) + val expectedPartitionSchema = StructType(Seq(StructField("p", IntegerType, true))) + assert(table.fileIndex.partitionSchema == expectedPartitionSchema) + } + } + + test("Returns correct data schema when user specified schema contains partition schema") { + withTempPath { dir => + val df = spark.createDataFrame(Seq(("a", 1), ("b", 2))).toDF("v", "p") + val pathName = dir.getCanonicalPath + df.write.partitionBy("p").text(pathName) + val options = new CaseInsensitiveStringMap(Map("path" -> pathName).asJava) + val userSpecifiedSchema = Some(StructType(Seq( + StructField("v", StringType, true), + StructField("p", IntegerType, true)))) + val expectedDataSchema = StructType(Seq(StructField("v", StringType, true))) + val table = + new DummyFileTable(spark, options, Seq(pathName), expectedDataSchema, userSpecifiedSchema) + assert(table.dataSchema == expectedDataSchema) + } + } +} From 43743b38ccda80c84312b6810886c6402d6dc1e8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 25 Mar 2019 22:55:26 +0800 Subject: [PATCH 3/7] undo unrelated changes --- .../spark/sql/execution/datasources/v2/FileScanBuilder.scala | 4 ++-- .../sql/execution/datasources/v2/orc/OrcScanBuilder.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index b63e025b76d1..d4e55a50307d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -16,8 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownRequiredColumns} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.types.StructType abstract class FileScanBuilder(schema: StructType) extends ScanBuilder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 8014b32c997a..8ac56aa5f64b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFilters import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader.{Scan, SupportsPushDownFilters} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap case class OrcScanBuilder( From 214bd8b743dfafdb48372c7a0030ed04e3d3f4ba Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 26 Mar 2019 10:23:57 +0800 Subject: [PATCH 4/7] fix --- .../apache/spark/sql/execution/datasources/v2/FileTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 459af2295b33..a2c50e3329a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -64,7 +64,7 @@ abstract class FileTable( dataSchema.foreach { field => if (!supportsDataType(field.dataType)) { throw new AnalysisException( - s"$name data source does not support ${field.dataType.catalogString} data type.") + s"$formatName data source does not support ${field.dataType.catalogString} data type.") } } val partitionSchema = fileIndex.partitionSchema From d8b2638c539462a04820f80d8b8fee7ab43c3d1d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 26 Mar 2019 16:34:56 +0800 Subject: [PATCH 5/7] address comments --- .../execution/datasources/v2/FileScan.scala | 4 ++-- .../execution/datasources/v2/FileTable.scala | 18 +++++++++--------- .../datasources/v2/csv/CSVDataSourceV2.scala | 2 +- .../datasources/v2/orc/OrcDataSourceV2.scala | 2 +- .../execution/datasources/v2/orc/OrcScan.scala | 2 +- .../datasources/v2/FileTableSuite.scala | 2 +- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 7203918f3c68..e971fd762efe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class FileScan( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index a2c50e3329a3..699a4981cc73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -47,15 +47,15 @@ abstract class FileTable( } lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => - val partitionSchema = fileIndex.partitionSchema - val equality = sparkSession.sessionState.conf.resolver - StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) - }.orElse { - inferSchema(fileIndex.allFiles()) - }.getOrElse { - throw new AnalysisException( - s"Unable to infer schema for $name. It must be specified manually.") - }.asNullable + val partitionSchema = fileIndex.partitionSchema + val equality = sparkSession.sessionState.conf.resolver + StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) + }.orElse { + inferSchema(fileIndex.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $name. It must be specified manually.") + }.asNullable override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala index 4d3726262568..55222c624d91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.v2.Table -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class CSVDataSourceV2 extends FileDataSourceV2 { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index 5d3c6113b732..e8b9e6c6f498 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.sources.v2.Table -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class OrcDataSourceV2 extends FileDataSourceV2 { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 59f52daa8bf5..fc8a682b226c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala index dceaf7fb308b..3d4f5640723e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala @@ -62,7 +62,7 @@ class FileTableSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val table = new DummyFileTable(spark, options, Seq(pathName), expectedDataSchema, None) assert(table.dataSchema == expectedDataSchema) val expectedPartitionSchema = StructType(Seq(StructField("p", IntegerType, true))) - assert(table.fileIndex.partitionSchema == expectedPartitionSchema) + assert(table.fileIndex.partitionSchema == expectedPartitionSchema) } } From 63466b1d36c466578c273609e2faf00387b609fb Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 26 Mar 2019 16:53:50 +0800 Subject: [PATCH 6/7] revise --- .../apache/spark/sql/execution/datasources/v2/FileTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 699a4981cc73..ce95095fb843 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -84,7 +84,7 @@ abstract class FileTable( def inferSchema(files: Seq[FileStatus]): Option[StructType] /** - * Returns whether this format supports the given [[DataType]] in write path. + * Returns whether this format supports the given [[DataType]] in read/write path. * By default all data types are supported. */ def supportsDataType(dataType: DataType): Boolean = true From 4ca742d60f402392609886adcd67d8318a110f8a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 27 Mar 2019 01:00:56 +0800 Subject: [PATCH 7/7] revise --- .../apache/spark/sql/execution/datasources/v2/FileTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index ce95095fb843..188016c161a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -48,8 +48,8 @@ abstract class FileTable( lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => val partitionSchema = fileIndex.partitionSchema - val equality = sparkSession.sessionState.conf.resolver - StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) + val resolver = sparkSession.sessionState.conf.resolver + StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name)))) }.orElse { inferSchema(fileIndex.allFiles()) }.getOrElse {