From 0033ed2ef2013e3f4abb56f9bc47989575589518 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Mon, 5 Jan 2015 00:26:14 +0800 Subject: [PATCH 1/6] SPARK-5068: fix bug query data when path doesn't exists --- .../apache/spark/sql/hive/TableReader.scala | 6 ++- .../spark/sql/hive/QueryPartitionSuite.scala | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index effaa5a443512..6d2c9b2a02154 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -142,7 +142,11 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => + val hivePartitionRDDs = partitionToDeserializer.filter{ case (partition, partDeserializer) => + val partPath = HiveShim.getDataLocationPath(partition) + val fs = partPath.getFileSystem(sc.hiveconf) + fs.exists(partPath) + }.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala new file mode 100644 index 0000000000000..576a9bfd61b6b --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -0,0 +1,49 @@ +package org.apache.spark.sql.hive + +import java.io.File + +import com.google.common.io.Files +import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.hive.test.TestHive +/* Implicits */ +import org.apache.spark.sql.hive.test.TestHive._ + + +class QueryPartitionSuite extends QueryTest { + + test("SPARK-5068: query data when path doesn't exists"){ + val testData = TestHive.sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString))) + testData.registerTempTable("testData") + + val tmpDir = Files.createTempDir() + //create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData") + //test for the exist path + checkAnswer(sql("select key,value from table_with_partition"), + testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq) + + //delect the path of one partition + val folders = tmpDir.listFiles.filter(_.isDirectory).toList + def deleteAll(file:File){ + if(file.isDirectory()){ + for(f:File <-file.listFiles()){ + deleteAll(f); + } + } + file.delete(); + } + deleteAll(folders(0)) + + //test for the affter delete the path + checkAnswer(sql("select key,value from table_with_partition"), + testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq) + + sql("DROP TABLE table_with_partition") + sql("DROP TABLE createAndInsertTest") + } +} From 1a65548ee2d1ddc8d0a65cae2b421307d9b6b252 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Wed, 28 Jan 2015 11:29:21 +0800 Subject: [PATCH 2/6] add the Licensed --- .../spark/sql/hive/QueryPartitionSuite.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 576a9bfd61b6b..0562127109ff9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.hive import java.io.File @@ -25,7 +42,8 @@ class QueryPartitionSuite extends QueryTest { sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData") //test for the exist path checkAnswer(sql("select key,value from table_with_partition"), - testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq) + testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect + ++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect) //delect the path of one partition val folders = tmpDir.listFiles.filter(_.isDirectory).toList @@ -41,7 +59,8 @@ class QueryPartitionSuite extends QueryTest { //test for the affter delete the path checkAnswer(sql("select key,value from table_with_partition"), - testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq) + testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect + ++ testData.toSchemaRDD.collect) sql("DROP TABLE table_with_partition") sql("DROP TABLE createAndInsertTest") From 76df33f443dcdaff97a0b8511a0fc656fef81fe2 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Mon, 2 Feb 2015 19:49:46 +0800 Subject: [PATCH 3/6] fix code style --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6d2c9b2a02154..3b505af1c5ef5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -142,7 +142,7 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.filter{ case (partition, partDeserializer) => + val hivePartitionRDDs = partitionToDeserializer.filter { case (partition, partDeserializer) => val partPath = HiveShim.getDataLocationPath(partition) val fs = partPath.getFileSystem(sc.hiveconf) fs.exists(partPath) From 6958312bbd9d29c042294960d95ec0aaadaead9c Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 3 Feb 2015 22:01:36 -0800 Subject: [PATCH 4/6] Return empty row when table / partition path doesn't exist --- .../apache/spark/sql/hive/TableReader.scala | 161 ++++++++++-------- ...ed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d | 4 + ...ed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d | 3 + ...ed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d | 0 .../spark/sql/hive/QueryPartitionSuite.scala | 91 +++++----- 5 files changed, 147 insertions(+), 112 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d create mode 100644 sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d create mode 100644 sql/hive/src/test/resources/golden/SPARK-5068 scan table with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 3b505af1c5ef5..a8530e6544c71 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -102,24 +102,28 @@ class HadoopTableReader( val broadcastedHiveConf = _broadcastedHiveConf val tablePath = hiveTable.getPath - val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) + applyFilterIfNeeded(tablePath, filterOpt) match { + case Some(inputPathStr) => + // logDebug("Table input: %s".format(tablePath)) + val ifc = hiveTable.getInputFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - // logDebug("Table input: %s".format(tablePath)) - val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val hadoopRDD = createHadoopRdd (tableDesc, inputPathStr, ifc) - val attrsWithIndex = attributes.zipWithIndex - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + val attrsWithIndex = attributes.zipWithIndex + val mutableRow = new SpecificMutableRow (attributes.map (_.dataType) ) - val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value - val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) - HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) - } + val deserializedHadoopRDD = hadoopRDD.mapPartitions { + iter => + val hconf = broadcastedHiveConf.value.value + val deserializer = deserializerClass.newInstance () + deserializer.initialize (hconf, tableDesc.getProperties) + HadoopTableReader.fillObject (iter, deserializer, attrsWithIndex, mutableRow) + } - deserializedHadoopRDD + deserializedHadoopRDD + case None => new EmptyRDD[Row](sc.sparkContext) + } } override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = { @@ -142,60 +146,62 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.filter { case (partition, partDeserializer) => - val partPath = HiveShim.getDataLocationPath(partition) - val fs = partPath.getFileSystem(sc.hiveconf) - fs.exists(partPath) - }.map { case (partition, partDeserializer) => + val hivePartitionRDDs = partitionToDeserializer.flatMap { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) - val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) - val ifc = partDesc.getInputFileFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - // Get partition field info - val partSpec = partDesc.getPartSpec - val partProps = partDesc.getProperties - - val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS) - // Partitioning columns are delimited by "/" - val partCols = partColsDelimited.trim().split("/").toSeq - // 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'. - val partValues = if (partSpec == null) { - Array.fill(partCols.size)(new String) - } else { - partCols.map(col => new String(partSpec.get(col))).toArray - } - - // Create local references so that the outer object isn't serialized. - val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf - val localDeserializer = partDeserializer - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) - - // Splits all attributes into two groups, partition key attributes and those that are not. - // Attached indices indicate the position of each attribute in the output schema. - val (partitionKeyAttrs, nonPartitionKeyAttrs) = - attributes.zipWithIndex.partition { case (attr, _) => - relation.partitionKeys.contains(attr) - } - - def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = { - partitionKeyAttrs.foreach { case (attr, ordinal) => - val partOrdinal = relation.partitionKeys.indexOf(attr) - row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) - } - } - - // Fill all partition keys to the given MutableRow object - fillPartitionKeys(partValues, mutableRow) - - createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value - val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) - - // fill the non partition key attributes - HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow) + applyFilterIfNeeded(partPath, filterOpt) match { + case Some(inputPathStr) => + val ifc = partDesc.getInputFileFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + // Get partition field info + val partSpec = partDesc.getPartSpec + val partProps = partDesc.getProperties + + val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS) + // Partitioning columns are delimited by "/" + val partCols = partColsDelimited.trim().split("/").toSeq + // 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'. + val partValues = if (partSpec == null) { + Array.fill(partCols.size)(new String) + } else { + partCols.map(col => new String(partSpec.get(col))).toArray + } + + // Create local references so that the outer object isn't serialized. + val tableDesc = relation.tableDesc + val broadcastedHiveConf = _broadcastedHiveConf + val localDeserializer = partDeserializer + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + + // Splits all attributes into two groups, partition key attributes and those that are not. + // Attached indices indicate the position of each attribute in the output schema. + val (partitionKeyAttrs, nonPartitionKeyAttrs) = + attributes.zipWithIndex.partition { + case (attr, _) => + relation.partitionKeys.contains(attr) + } + + def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = { + partitionKeyAttrs.foreach { + case (attr, ordinal) => + val partOrdinal = relation.partitionKeys.indexOf(attr) + row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) + } + } + + // Fill all partition keys to the given MutableRow object + fillPartitionKeys(partValues, mutableRow) + + Some(createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { + iter => + val hconf = broadcastedHiveConf.value.value + val deserializer = localDeserializer.newInstance() + deserializer.initialize(hconf, partProps) + + // fill the non partition key attributes + HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow) + }) + case None => None } }.toSeq @@ -211,13 +217,22 @@ class HadoopTableReader( * If `filterOpt` is defined, then it will be used to filter files from `path`. These files are * returned in a single, comma-separated string. */ - private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = { - filterOpt match { - case Some(filter) => - val fs = path.getFileSystem(sc.hiveconf) - val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) - filteredFiles.mkString(",") - case None => path.toString + private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Option[String] = { + if (path.getFileSystem(sc.hiveconf).exists(path)) { + // if the file exists + filterOpt match { + case Some(filter) => + val fs = path.getFileSystem(sc.hiveconf) + val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) + if (filteredFiles.length > 0) { + Some(filteredFiles.mkString(",")) + } else { + None + } + case None => Some(path.toString) + } + } else { + None } } diff --git a/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d new file mode 100644 index 0000000000000..407eb07739325 --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d @@ -0,0 +1,4 @@ +238 val_238 +238 val_238 +238 val_238 +238 val_238 diff --git a/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d new file mode 100644 index 0000000000000..161cc1f8dd03b --- /dev/null +++ b/sql/hive/src/test/resources/golden/SPARK-5068 scan partition with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d @@ -0,0 +1,3 @@ +238 val_238 +238 val_238 +238 val_238 diff --git a/sql/hive/src/test/resources/golden/SPARK-5068 scan table with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d b/sql/hive/src/test/resources/golden/SPARK-5068 scan table with non-existed path-0-ebcdb90c1c5e72bc55cf545e9e35ca7d new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 0562127109ff9..67f461a4ef4ba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -17,52 +17,65 @@ package org.apache.spark.sql.hive -import java.io.File +import org.scalatest.BeforeAndAfter -import com.google.common.io.Files -import org.apache.spark.sql.{QueryTest, _} -import org.apache.spark.sql.hive.test.TestHive -/* Implicits */ import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.util.Utils +import org.apache.spark.sql.hive.execution.HiveComparisonTest +abstract class QueryPartitionSuite extends HiveComparisonTest with BeforeAndAfter { + protected val tmpDir = Utils.createTempDir() -class QueryPartitionSuite extends QueryTest { + override def beforeAll() { + sql( + s"""CREATE TABLE table_with_partition(key int,value string) + |PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' + |""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='1') SELECT key,value FROM src LIMIT 1""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='2') SELECT key,value FROM src LIMIT 1""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='3') SELECT key,value FROM src LIMIT 1""".stripMargin) + sql( + s"""INSERT OVERWRITE TABLE table_with_partition + | partition (ds='4') SELECT key,value FROM src LIMIT 1""".stripMargin) + } - test("SPARK-5068: query data when path doesn't exists"){ - val testData = TestHive.sparkContext.parallelize( - (1 to 10).map(i => TestData(i, i.toString))) - testData.registerTempTable("testData") + override def afterAll() { + sql("DROP TABLE table_with_partition") + } +} - val tmpDir = Files.createTempDir() - //create the table for test - sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData") - //test for the exist path - checkAnswer(sql("select key,value from table_with_partition"), - testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect - ++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect) +class QueryPartitionSuite0 extends QueryPartitionSuite { + //test for the exist path + createQueryTest("SPARK-5068 scan partition with existed path", + "select key,value from table_with_partition", false) +} - //delect the path of one partition - val folders = tmpDir.listFiles.filter(_.isDirectory).toList - def deleteAll(file:File){ - if(file.isDirectory()){ - for(f:File <-file.listFiles()){ - deleteAll(f); - } - } - file.delete(); - } - deleteAll(folders(0)) +class QueryPartitionSuite1 extends QueryPartitionSuite { + override def beforeAll(): Unit = { + super.beforeAll() + //delete the one of the partition manually + val folders = tmpDir.listFiles.filter(_.isDirectory) + Utils.deleteRecursively(folders(0)) + } - //test for the affter delete the path - checkAnswer(sql("select key,value from table_with_partition"), - testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect - ++ testData.toSchemaRDD.collect) + //test for the after deleting the partition path + createQueryTest("SPARK-5068 scan partition with non-existed path", + "select key,value from table_with_partition", false) +} - sql("DROP TABLE table_with_partition") - sql("DROP TABLE createAndInsertTest") +class QueryPartitionSuite2 extends QueryPartitionSuite { + override def beforeAll(): Unit = { + super.beforeAll() + // delete the path of the table file + Utils.deleteRecursively(tmpDir) } -} + + createQueryTest("SPARK-5068 scan table with non-existed path", + "select key,value from table_with_partition", false) +} \ No newline at end of file From 1f033cd8901bd97c8a4677e284847a2e975c6987 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 3 Feb 2015 22:26:41 -0800 Subject: [PATCH 5/6] Move the FileSystem variable as class member --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index a8530e6544c71..0eed1bbd9f4b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities @@ -68,6 +68,8 @@ class HadoopTableReader( math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) } + @transient private lazy val fs = FileSystem.get(sc.hiveconf) + // TODO: set aws s3 credentials. private val _broadcastedHiveConf = @@ -218,11 +220,10 @@ class HadoopTableReader( * returned in a single, comma-separated string. */ private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Option[String] = { - if (path.getFileSystem(sc.hiveconf).exists(path)) { + if (fs.exists(path)) { // if the file exists filterOpt match { case Some(filter) => - val fs = path.getFileSystem(sc.hiveconf) val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) if (filteredFiles.length > 0) { Some(filteredFiles.mkString(",")) From d3a4d3c0b6b6f015f61a7d23b4401761f0b25a60 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Wed, 4 Feb 2015 03:03:30 -0800 Subject: [PATCH 6/6] Fix the potential bug of referencing FileSystem --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 3 +-- .../scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 0eed1bbd9f4b7..9457a388fc225 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -68,8 +68,6 @@ class HadoopTableReader( math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) } - @transient private lazy val fs = FileSystem.get(sc.hiveconf) - // TODO: set aws s3 credentials. private val _broadcastedHiveConf = @@ -220,6 +218,7 @@ class HadoopTableReader( * returned in a single, comma-separated string. */ private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Option[String] = { + val fs = path.getFileSystem(sc.hiveconf) if (fs.exists(path)) { // if the file exists filterOpt match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 67f461a4ef4ba..c0b52d200cb61 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.util.Utils +import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.execution.HiveComparisonTest abstract class QueryPartitionSuite extends HiveComparisonTest with BeforeAndAfter { @@ -78,4 +78,4 @@ class QueryPartitionSuite2 extends QueryPartitionSuite { createQueryTest("SPARK-5068 scan table with non-existed path", "select key,value from table_with_partition", false) -} \ No newline at end of file +}