From c4fa70fa263582dc5fa8c5b5cc1900ad59d3f831 Mon Sep 17 00:00:00 2001 From: Thu Kyaw Date: Sat, 9 May 2015 01:20:26 -0400 Subject: [PATCH] [SPARK-3928][SQL] Support wildcard matches on Parquet files for newParquet implementation. --- sql/core/pom.xml | 2 +- .../apache/spark/sql/parquet/newParquet.scala | 17 ++++---- .../spark/sql/parquet/ParquetIOSuite.scala | 42 +++++++++++++++++++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index e3a6b1fe72435..6a7151ad10390 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -64,7 +64,7 @@ com.fasterxml.jackson.core jackson-databind - 2.3.0 + 2.4.4 org.jodd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 85e60733bc57a..9e343199a4e4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -265,17 +265,20 @@ private[sql] case class ParquetRelation2( def refresh(): Unit = { // Support either reading a collection of raw Parquet part-files, or a collection of folders // containing Parquet files (e.g. partitioned Parquet table). - val baseStatuses = paths.distinct.map { p => + val baseStatuses = paths.distinct.flatMap { p => val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration) val path = new Path(p) val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - - if (!fs.exists(qualified) && maybeSchema.isDefined) { - fs.mkdirs(qualified) - prepareMetadata(qualified, maybeSchema.get, sparkContext.hadoopConfiguration) + val matchStatuses = fs.globStatus(qualified) + if(matchStatuses != null && matchStatuses.size > 0) { + matchStatuses + } else { + if (!fs.exists(qualified) && maybeSchema.isDefined) { + fs.mkdirs(qualified) + prepareMetadata(qualified, maybeSchema.get, sparkContext.hadoopConfiguration) + } + List(fs.getFileStatus(qualified)) } - - fs.getFileStatus(qualified) }.toArray assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index b504842053690..53eea970ee1f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -288,6 +288,48 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } } + test("wildcard read raw Parquet file") { + def makeRawParquetFile(path: Path): Unit = { + val schema = MessageTypeParser.parseMessageType( + """ + |message root { + | required boolean _1; + | required int32 _2; + | required int64 _3; + | required float _4; + | required double _5; + |} + """.stripMargin) + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + + (0 until 10).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, i % 2 == 0) + record.add(1, i) + record.add(2, i.toLong) + record.add(3, i.toFloat) + record.add(4, i.toDouble) + writer.write(record) + } + + writer.close() + } + + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawParquetFile(path) + val globPath = dir.toURI.toString + val globPatterns = Array(globPath + "/*", globPath + "/*.parquet", globPath + "/?art-?-0.*") + globPatterns.foreach { path => + checkAnswer(parquetFile(path.toString), (0 until 10).map { i => + Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + }) + } + } + } + test("write metadata") { withTempPath { file => val path = new Path(file.toURI.toString)