Skip to content

Commit a83d290

Browse files
committed
Passes Hive Metastore partitioning information to ParquetRelation2
Rewires Parquet data source and the new data source write support Temporary solution for moving Parquet conversion to analysis phase Although it works, it's so ugly... I duplicated the whole Analyzer in Hive Context. Have to fix this. Cleaner solution for Metastore Parquet table conversion Fixes compilation errors introduced during rebasing Minor cleanups Addresses @yhuai's comments
1 parent ed5f4bb commit a83d290

File tree

8 files changed

+529
-155
lines changed

8 files changed

+529
-155
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,6 @@ private[parquet] object FileSystemHelper {
647647
sys.error("ERROR: attempting to append to set of Parquet files and found file" +
648648
s"that does not match name pattern: $other")
649649
case _ => 0
650-
}.reduceLeft((a, b) => if (a < b) b else a)
650+
}.reduceOption(_ max _).getOrElse(0)
651651
}
652652
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import scala.reflect.ClassTag
2323
import scala.reflect.runtime.universe.TypeTag
2424
import scala.util.Try
2525

26-
import org.apache.spark.sql.{DataFrame, SQLContext}
2726
import org.apache.spark.sql.catalyst.util
27+
import org.apache.spark.sql.SaveMode
28+
import org.apache.spark.sql.{DataFrame, SQLContext}
2829
import org.apache.spark.util.Utils
2930

3031
/**
@@ -38,6 +39,7 @@ trait ParquetTest {
3839
val sqlContext: SQLContext
3940

4041
import sqlContext._
42+
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
4143

4244
protected def configuration = sparkContext.hadoopConfiguration
4345

@@ -88,7 +90,6 @@ trait ParquetTest {
8890
protected def withParquetFile[T <: Product: ClassTag: TypeTag]
8991
(data: Seq[T])
9092
(f: String => Unit): Unit = {
91-
import sqlContext.implicits._
9293
withTempPath { file =>
9394
sparkContext.parallelize(data).toDF().saveAsParquetFile(file.getCanonicalPath)
9495
f(file.getCanonicalPath)
@@ -125,4 +126,26 @@ trait ParquetTest {
125126
withTempTable(tableName)(f)
126127
}
127128
}
129+
130+
protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
131+
data: Seq[T], path: File): Unit = {
132+
data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
133+
}
134+
135+
protected def makePartitionDir(
136+
basePath: File,
137+
defaultPartitionName: String,
138+
partitionCols: (String, Any)*): File = {
139+
val partNames = partitionCols.map { case (k, v) =>
140+
val valueString = if (v == null || v == "") defaultPartitionName else v.toString
141+
s"$k=$valueString"
142+
}
143+
144+
val partDir = partNames.foldLeft(basePath) { (parent, child) =>
145+
new File(parent, child)
146+
}
147+
148+
assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
149+
partDir
150+
}
128151
}

0 commit comments

Comments
 (0)