Skip to content

Commit ec9950c

Browse files
committed
Migrates Parquet data source to FSBasedRelation
1 parent 0595b6d commit ec9950c

File tree

14 files changed

+817
-1025
lines changed

14 files changed

+817
-1025
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ import scala.reflect.runtime.universe.TypeTag
2727
import scala.util.control.NonFatal
2828

2929
import com.google.common.reflect.TypeToken
30+
import org.apache.hadoop.fs.Path
3031

3132
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3233
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
34+
import org.apache.spark.deploy.SparkHadoopUtil
3335
import org.apache.spark.rdd.RDD
3436
import org.apache.spark.sql.catalyst.analysis._
3537
import org.apache.spark.sql.catalyst.expressions._
@@ -42,6 +44,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, e
4244
import org.apache.spark.sql.execution.{Filter, _}
4345
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
4446
import org.apache.spark.sql.json._
47+
import org.apache.spark.sql.parquet.FSBasedParquetRelation
4548
import org.apache.spark.sql.sources._
4649
import org.apache.spark.sql.types._
4750
import org.apache.spark.util.Utils
@@ -597,7 +600,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
597600
if (paths.isEmpty) {
598601
emptyDataFrame
599602
} else if (conf.parquetUseDataSourceApi) {
600-
baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
603+
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
604+
baseRelationToDataFrame(
605+
new FSBasedParquetRelation(
606+
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
601607
} else {
602608
DataFrame(this, parquet.ParquetRelation(
603609
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala

Lines changed: 167 additions & 111 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ private[parquet] object FileSystemHelper {
674674
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
675675
val files = FileSystemHelper.listFiles(pathStr, conf)
676676
// filename pattern is part-r-<int>.parquet
677-
val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
677+
val nameP = new scala.util.matching.Regex("""part-.-(\d{1,}).*""", "taskid")
678678
val hiddenFileP = new scala.util.matching.Regex("_.*")
679679
files.map(_.getName).map {
680680
case nameP(taskid) => taskid.toInt

0 commit comments

Comments
 (0)