Skip to content

Commit 0d8ec1d

Browse files
committed
Adds more test cases
1 parent b35c8c6 commit 0d8ec1d

File tree

4 files changed

+177
-9
lines changed

4 files changed

+177
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ object ParquetRelation2 {
508508
finished = maybeColumn.isEmpty || chopped.isRoot
509509
}
510510

511-
val (columnNames, values) = columns.unzip
511+
val (columnNames, values) = columns.reverse.unzip
512512
PartitionValues(columnNames, values)
513513
}
514514

@@ -520,8 +520,12 @@ object ParquetRelation2 {
520520
None
521521
} else {
522522
val columnName = columnSpec.take(equalSignIndex)
523-
val literal = inferPartitionColumnValue(
524-
columnSpec.drop(equalSignIndex + 1), defaultPartitionName)
523+
assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
524+
525+
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
526+
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
527+
528+
val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
525529
Some(columnName -> literal)
526530
}
527531
}
@@ -536,9 +540,9 @@ object ParquetRelation2 {
536540
* StringType
537541
* }}}
538542
*/
539-
private[parquet] def resolvePartitions(descs: Seq[PartitionValues]): Seq[PartitionValues] = {
540-
val distinctColNamesOfPartitions = descs.map(_.columnNames).distinct
541-
val columnCount = descs.head.columnNames.size
543+
private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
544+
val distinctColNamesOfPartitions = values.map(_.columnNames).distinct
545+
val columnCount = values.head.columnNames.size
542546

543547
// Column names of all partitions must match
544548
assert(distinctColNamesOfPartitions.size == 1, {
@@ -548,11 +552,11 @@ object ParquetRelation2 {
548552

549553
// Resolves possible type conflicts for each column
550554
val resolvedValues = (0 until columnCount).map { i =>
551-
resolveTypeConflicts(descs.map(_.literals(i)))
555+
resolveTypeConflicts(values.map(_.literals(i)))
552556
}
553557

554558
// Fills resolved literals back to each partition
555-
descs.zipWithIndex.map { case (d, index) =>
559+
values.zipWithIndex.map { case (d, index) =>
556560
d.copy(literals = resolvedValues.map(_(index)))
557561
}
558562
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
257257
'_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
258258

259259
checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
260-
checkFilterPredicate('_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
260+
checkFilterPredicate(
261+
'_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
261262

262263
checkFilterPredicate('_1 < "2", classOf[Lt[_]], "1")
263264
checkFilterPredicate('_1 > "3", classOf[Gt[_]], "4")
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.parquet
18+
19+
import scala.collection.mutable.ArrayBuffer
20+
21+
import org.apache.hadoop.fs.Path
22+
import org.scalatest.FunSuite
23+
24+
import org.apache.spark.sql.catalyst.expressions.Literal
25+
import org.apache.spark.sql.parquet.ParquetRelation2._
26+
import org.apache.spark.sql.test.TestSQLContext
27+
import org.apache.spark.sql.types._
28+
import org.apache.spark.sql.{Row, SQLContext}
29+
30+
class ParquetPartitionDiscoverySuite extends FunSuite with ParquetTest {
31+
override val sqlContext: SQLContext = TestSQLContext
32+
33+
val defaultPartitionName = "__NULL__"
34+
35+
test("column type inference") {
36+
def check(raw: String, literal: Literal): Unit = {
37+
assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal)
38+
}
39+
40+
check("10", Literal(10, IntegerType))
41+
check("1000000000000000", Literal(1000000000000000L, LongType))
42+
check("1.5", Literal(1.5, FloatType))
43+
check("hello", Literal("hello", StringType))
44+
check(defaultPartitionName, Literal(null, NullType))
45+
}
46+
47+
test("parse partition") {
48+
def check(path: String, expected: PartitionValues): Unit = {
49+
assert(expected === parsePartition(new Path(path), defaultPartitionName))
50+
}
51+
52+
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
53+
val message = intercept[T] {
54+
parsePartition(new Path(path), defaultPartitionName)
55+
}.getMessage
56+
57+
assert(message.contains(expected))
58+
}
59+
60+
check(
61+
"file:///",
62+
PartitionValues(
63+
ArrayBuffer.empty[String],
64+
ArrayBuffer.empty[Literal]))
65+
66+
check(
67+
"file://path/a=10",
68+
PartitionValues(
69+
ArrayBuffer("a"),
70+
ArrayBuffer(Literal(10, IntegerType))))
71+
72+
check(
73+
"file://path/a=10/b=hello/c=1.5",
74+
PartitionValues(
75+
ArrayBuffer("a", "b", "c"),
76+
ArrayBuffer(
77+
Literal(10, IntegerType),
78+
Literal("hello", StringType),
79+
Literal(1.5, FloatType))))
80+
81+
check(
82+
"file://path/a=10/b_hello/c=1.5",
83+
PartitionValues(
84+
ArrayBuffer("c"),
85+
ArrayBuffer(Literal(1.5, FloatType))))
86+
87+
checkThrows[AssertionError]("file://path/=10", "Empty partition column name")
88+
checkThrows[AssertionError]("file://path/a=", "Empty partition column value")
89+
}
90+
91+
test("parse partitions") {
92+
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
93+
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === spec)
94+
}
95+
96+
check(Seq(
97+
"hdfs://host:9000/path/a=10/b=hello"),
98+
PartitionSpec(
99+
StructType(Seq(
100+
StructField("a", IntegerType),
101+
StructField("b", StringType))),
102+
Seq(Partition(Row(10, "hello"), "hdfs://host:9000/path/a=10/b=hello"))))
103+
104+
check(Seq(
105+
"hdfs://host:9000/path/a=10/b=20",
106+
"hdfs://host:9000/path/a=10.5/b=hello"),
107+
PartitionSpec(
108+
StructType(Seq(
109+
StructField("a", FloatType),
110+
StructField("b", StringType))),
111+
Seq(
112+
Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
113+
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
114+
115+
check(Seq(
116+
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
117+
s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
118+
PartitionSpec(
119+
StructType(Seq(
120+
StructField("a", FloatType),
121+
StructField("b", StringType))),
122+
Seq(
123+
Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
124+
Partition(Row(10.5, null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
125+
}
126+
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import parquet.schema.MessageTypeParser
2525

2626
import org.apache.spark.sql.catalyst.ScalaReflection
2727
import org.apache.spark.sql.test.TestSQLContext
28+
import org.apache.spark.sql.types._
2829

2930
class ParquetSchemaSuite extends FunSuite with ParquetTest {
3031
val sqlContext = TestSQLContext
@@ -192,4 +193,40 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
192193
assert(a.nullable === b.nullable)
193194
}
194195
}
196+
197+
test("merge with metastore schema") {
198+
// Field type conflict resolution
199+
assertResult(
200+
StructType(Seq(
201+
StructField("lowerCase", StringType),
202+
StructField("UPPERCase", DoubleType, nullable = false)))) {
203+
204+
ParquetRelation2.mergeMetastoreParquetSchema(
205+
StructType(Seq(
206+
StructField("lowercase", StringType),
207+
StructField("uppercase", DoubleType, nullable = false))),
208+
209+
StructType(Seq(
210+
StructField("lowerCase", BinaryType),
211+
StructField("UPPERCase", IntegerType, nullable = true))))
212+
}
213+
214+
// Conflicting field count
215+
assert(intercept[Throwable] {
216+
ParquetRelation2.mergeMetastoreParquetSchema(
217+
StructType(Seq(
218+
StructField("uppercase", DoubleType, nullable = false))),
219+
220+
StructType(Seq(
221+
StructField("lowerCase", BinaryType),
222+
StructField("UPPERCase", IntegerType, nullable = true))))
223+
}.getMessage.contains("detected conflicting schemas"))
224+
225+
// Conflicting field names
226+
intercept[Throwable] {
227+
ParquetRelation2.mergeMetastoreParquetSchema(
228+
StructType(Seq(StructField("lower", StringType))),
229+
StructType(Seq(StructField("lowerCase", BinaryType))))
230+
}
231+
}
195232
}

0 commit comments

Comments
 (0)