diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index effaa5a443512..9457a388fc225 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities @@ -102,24 +102,28 @@ class HadoopTableReader( val broadcastedHiveConf = _broadcastedHiveConf val tablePath = hiveTable.getPath - val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) + applyFilterIfNeeded(tablePath, filterOpt) match { + case Some(inputPathStr) => + // logDebug("Table input: %s".format(tablePath)) + val ifc = hiveTable.getInputFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - // logDebug("Table input: %s".format(tablePath)) - val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val hadoopRDD = createHadoopRdd (tableDesc, inputPathStr, ifc) - val attrsWithIndex = attributes.zipWithIndex - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + val attrsWithIndex = attributes.zipWithIndex + val mutableRow = new SpecificMutableRow (attributes.map (_.dataType) ) - val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value - val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) - HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) - } + val deserializedHadoopRDD = hadoopRDD.mapPartitions { + iter => + val hconf = broadcastedHiveConf.value.value + val deserializer = deserializerClass.newInstance () + deserializer.initialize (hconf, tableDesc.getProperties) + HadoopTableReader.fillObject (iter, deserializer, attrsWithIndex, mutableRow) + } - deserializedHadoopRDD + deserializedHadoopRDD + case None => new EmptyRDD[Row](sc.sparkContext) + } } override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = { @@ -142,56 +146,62 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => + val hivePartitionRDDs = partitionToDeserializer.flatMap { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) - val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) - val ifc = partDesc.getInputFileFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - // Get partition field info - val partSpec = partDesc.getPartSpec - val partProps = partDesc.getProperties - - val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS) - // Partitioning columns are delimited by "/" - val partCols = partColsDelimited.trim().split("/").toSeq - // 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'. - val partValues = if (partSpec == null) { - Array.fill(partCols.size)(new String) - } else { - partCols.map(col => new String(partSpec.get(col))).toArray - } - - // Create local references so that the outer object isn't serialized. - val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf - val localDeserializer = partDeserializer - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) - - // Splits all attributes into two groups, partition key attributes and those that are not. - // Attached indices indicate the position of each attribute in the output schema. - val (partitionKeyAttrs, nonPartitionKeyAttrs) = - attributes.zipWithIndex.partition { case (attr, _) => - relation.partitionKeys.contains(attr) - } - - def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = { - partitionKeyAttrs.foreach { case (attr, ordinal) => - val partOrdinal = relation.partitionKeys.indexOf(attr) - row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) - } - } - - // Fill all partition keys to the given MutableRow object - fillPartitionKeys(partValues, mutableRow) - - createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value - val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) - - // fill the non partition key attributes - HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow) + applyFilterIfNeeded(partPath, filterOpt) match { + case Some(inputPathStr) => + val ifc = partDesc.getInputFileFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + // Get partition field info + val partSpec = partDesc.getPartSpec + val partProps = partDesc.getProperties + + val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS) + // Partitioning columns are delimited by "/" + val partCols = partColsDelimited.trim().split("/").toSeq + // 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'. + val partValues = if (partSpec == null) { + Array.fill(partCols.size)(new String) + } else { + partCols.map(col => new String(partSpec.get(col))).toArray + } + + // Create local references so that the outer object isn't serialized. + val tableDesc = relation.tableDesc + val broadcastedHiveConf = _broadcastedHiveConf + val localDeserializer = partDeserializer + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + + // Splits all attributes into two groups, partition key attributes and those that are not. + // Attached indices indicate the position of each attribute in the output schema. + val (partitionKeyAttrs, nonPartitionKeyAttrs) = + attributes.zipWithIndex.partition { + case (attr, _) => + relation.partitionKeys.contains(attr) + } + + def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = { + partitionKeyAttrs.foreach { + case (attr, ordinal) => + val partOrdinal = relation.partitionKeys.indexOf(attr) + row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) + } + } + + // Fill all partition keys to the given MutableRow object + fillPartitionKeys(partValues, mutableRow) + + Some(createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { + iter => + val hconf = broadcastedHiveConf.value.value + val deserializer = localDeserializer.newInstance() + deserializer.initialize(hconf, partProps) + + // fill the non partition key attributes + HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow) + }) + case None => None } }.toSeq @@ -207,13 +217,22 @@ class HadoopTableReader( * If `filterOpt` is defined, then it will be used to filter files from `path`. These files are * returned in a single, comma-separated string. */ - private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = { - filterOpt match { - case Some(filter) => - val fs = path.getFileSystem(sc.hiveconf) - val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) - filteredFiles.mkString(",") - case None => path.toString + private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Option[String] = { + val fs = path.getFileSystem(sc.hiveconf) + if (fs.exists(path)) { + // if the file exists + filterOpt match { + case Some(filter) => + val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) + if (filteredFiles.length > 0) { + Some(filteredFiles.mkString(",")) + } else { + None + } + case None => Some(path.toString) + } + } else { + None } } diff --git a/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d new file mode 100644 index 0000000000000..407eb07739325 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d @@ -0,0 +1,4 @@ +238 val_238 +238 val_238 +238 val_238 +238 val_238 diff --git a/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d new file mode 100644 index 0000000000000..161cc1f8dd03b --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d @@ -0,0 +1,3 @@ +238 val_238 +238 val_238 +238 val_238 diff --git a/sql/hive/src/test/resources/golden/SPARK-5068 scan table with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d b/sql/hive/src/test/resources/golden/SPARK-5068 scan table with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala new file mode 100644 index 0000000000000..c0b52d200cb61 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.util.Utils +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.execution.HiveComparisonTest + +abstract class QueryPartitionSuite extends HiveComparisonTest with BeforeAndAfter { + protected val tmpDir = Utils.createTempDir() + + override def beforeAll() { + sql( + s"""CREATE TABLE table_with_partition(key int,value string) + |PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' + |""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='1') SELECT key,value FROM src LIMIT 1""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='2') SELECT key,value FROM src LIMIT 1""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='3') SELECT key,value FROM src LIMIT 1""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='4') SELECT key,value FROM src LIMIT 1""".stripMargin) + } + + override def afterAll() { + sql("DROP TABLE table_with_partition") + } +} + +class QueryPartitionSuite0 extends QueryPartitionSuite { + //test for the exist path + createQueryTest("SPARK-5068 scan partition with existed path", + "select key,value from table_with_partition", false) +} + +class QueryPartitionSuite1 extends QueryPartitionSuite { + override def beforeAll(): Unit = { + super.beforeAll() + //delete the one of the partition manually + val folders = tmpDir.listFiles.filter(_.isDirectory) + Utils.deleteRecursively(folders(0)) + } + + //test for the after deleting the partition path + createQueryTest("SPARK-5068 scan partition with non-existed path", + "select key,value from table_with_partition", false) +} + +class QueryPartitionSuite2 extends QueryPartitionSuite { + override def beforeAll(): Unit = { + super.beforeAll() + // delete the path of the table file + Utils.deleteRecursively(tmpDir) + } + + createQueryTest("SPARK-5068 scan table with non-existed path", + "select key,value from table_with_partition", false) +}