Skip to content

Commit 6f009a2

Browse files
committed
More tests and comments
1 parent b6f4526 commit 6f009a2

File tree

3 files changed

+76
-6
lines changed

3 files changed

+76
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ private[parquet] object CatalystReadSupport {
102102

103103
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
104104

105+
/**
106+
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
107+
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
108+
*/
105109
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
106110
val clippedGroup = clipParquetType(parquetSchema.asGroupType(), catalystSchema).asGroupType()
107111
Types.buildMessage().addFields(clippedGroup.getFields.asScala: _*).named("root")
@@ -131,6 +135,8 @@ private[parquet] object CatalystReadSupport {
131135
}
132136

133137
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
138+
assert(!isPrimitiveCatalystType(elementType))
139+
134140
// Unannotated repeated group, list element type is just the group itself. Clip it.
135141
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
136142
clipParquetType(parquetList, elementType)
@@ -175,11 +181,13 @@ private[parquet] object CatalystReadSupport {
175181

176182
private def clipParquetMapType(
177183
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
184+
assert(!isPrimitiveCatalystType(valueType))
185+
178186
val repeatedGroup = parquetMap.getType(0).asGroupType()
179187
val parquetKeyType = repeatedGroup.getType(0)
180188
val parquetValueType = repeatedGroup.getType(1)
181189

182-
val clippedRepeatedGrouop =
190+
val clippedRepeatedGroup =
183191
Types
184192
.repeatedGroup()
185193
.as(repeatedGroup.getOriginalType)
@@ -190,7 +198,7 @@ private[parquet] object CatalystReadSupport {
190198
Types
191199
.buildGroup(parquetMap.getRepetition)
192200
.as(parquetMap.getOriginalType)
193-
.addField(clippedRepeatedGrouop)
201+
.addField(clippedRepeatedGroup)
194202
.named(parquetMap.getName)
195203
}
196204

@@ -206,6 +214,11 @@ private[parquet] object CatalystReadSupport {
206214
}
207215
}
208216

217+
// Here we can't use builder methods defined in `Types` to construct the `GroupType` and have to
218+
// resort to this deprecated constructor. The reason is that, `tailoredFields` can be empty,
219+
// and `Types` builder methods don't allow constructing empty group types. For example, query
220+
// `SELECT COUNT(1) FROM t` requests for zero columns.
221+
// TODO Refactor method signature to return a list of fields instead of a `GroupType`
209222
new GroupType(
210223
parquetRecord.getRepetition,
211224
parquetRecord.getName,

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
230230
}
231231
}
232232

233-
test("SPARK-10301") {
233+
test("SPARK-10301 Clipping nested structs in requested schema") {
234234
withTempPath { dir =>
235235
val path = dir.getCanonicalPath
236236
val df = sqlContext
@@ -240,13 +240,70 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
240240

241241
df.write.mode("append").parquet(path)
242242

243+
val userDefinedSchema = new StructType()
244+
.add("s", new StructType().add("a", LongType, nullable = true), nullable = true)
245+
246+
checkAnswer(
247+
sqlContext.read.schema(userDefinedSchema).parquet(path),
248+
Row(Row(0)))
249+
}
250+
251+
withTempPath { dir =>
252+
val path = dir.getCanonicalPath
253+
254+
val df1 = sqlContext
255+
.range(1)
256+
.selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
257+
.coalesce(1)
258+
259+
val df2 = sqlContext
260+
.range(1, 2)
261+
.selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s")
262+
.coalesce(1)
263+
264+
df1.write.parquet(path)
265+
df2.write.mode(SaveMode.Append).parquet(path)
266+
267+
val userDefinedSchema = new StructType()
268+
.add("s",
269+
new StructType()
270+
.add("a", LongType, nullable = true)
271+
.add("c", LongType, nullable = true),
272+
nullable = true)
273+
274+
checkAnswer(
275+
sqlContext.read.schema(userDefinedSchema).parquet(path),
276+
Seq(
277+
Row(Row(0, null)),
278+
Row(Row(null, 1))))
279+
}
280+
281+
withTempPath { dir =>
282+
val path = dir.getCanonicalPath
283+
284+
val df = sqlContext
285+
.range(1)
286+
.selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) AS s")
287+
.coalesce(1)
288+
289+
df.write.parquet(path)
290+
243291
val userDefinedSchema = new StructType()
244292
.add("s",
245293
new StructType()
246-
.add("a", LongType, nullable = true),
294+
.add(
295+
"a",
296+
ArrayType(
297+
new StructType()
298+
.add("b", LongType, nullable = true)
299+
.add("d", StringType, nullable = true),
300+
containsNull = true),
301+
nullable = true),
247302
nullable = true)
248303

249-
checkAnswer(sqlContext.read.schema(userDefinedSchema).parquet(path), Row(Row(0)))
304+
checkAnswer(
305+
sqlContext.read.schema(userDefinedSchema).parquet(path),
306+
Row(Row(Seq(Row(0, null)))))
250307
}
251308
}
252309
}

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.test
1919

20-
import org.apache.spark.sql.{Column, ColumnName, SQLContext}
20+
import org.apache.spark.sql.{ColumnName, SQLContext}
2121

2222

2323
/**

0 commit comments

Comments
 (0)