Skip to content

Commit 2f05196

Browse files
committed
Fix RowWriteSupport to handle empty ArrayType correctly.
1 parent 1114207 commit 2f05196

File tree

3 files changed

+16
-16
lines changed

3 files changed

+16
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter(
229229
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
230230

231231
protected [parquet] val converters: Array[Converter] =
232-
schema.map(field =>
233-
CatalystConverter.createConverter(field, schema.indexOf(field), this))
234-
.toArray
232+
schema.zipWithIndex.map {
233+
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
234+
}.toArray
235235

236236
override val size = schema.size
237237

@@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
288288
new ParquetRelation.RowType(attributes.length))
289289

290290
protected [parquet] val converters: Array[Converter] =
291-
schema.map(field =>
292-
CatalystConverter.createConverter(field, schema.indexOf(field), this))
293-
.toArray
291+
schema.zipWithIndex.map {
292+
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
293+
}.toArray
294294

295295
override val size = schema.size
296296

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
156156
writer.startMessage()
157157
while(index < attributes.size) {
158158
// null values indicate optional fields but we do not check currently
159-
if (record(index) != null && record(index) != Nil) {
159+
if (record(index) != null) {
160160
writer.startField(attributes(index).name, index)
161161
writeValue(attributes(index).dataType, record(index))
162162
writer.endField(attributes(index).name, index)
@@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
167167
}
168168

169169
private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
170-
if (value != null && value != Nil) {
170+
if (value != null) {
171171
schema match {
172172
case t @ ArrayType(_) => writeArray(
173173
t,
@@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
184184
}
185185

186186
private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
187-
if (value != null && value != Nil) {
187+
if (value != null) {
188188
schema match {
189189
case StringType => writer.addBinary(
190190
Binary.fromByteArray(
@@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
206206
private[parquet] def writeStruct(
207207
schema: StructType,
208208
struct: CatalystConverter.StructScalaType[_]): Unit = {
209-
if (struct != null && struct != Nil) {
209+
if (struct != null) {
210210
val fields = schema.fields.toArray
211211
writer.startGroup()
212212
var i = 0
213213
while(i < fields.size) {
214-
if (struct(i) != null && struct(i) != Nil) {
214+
if (struct(i) != null) {
215215
writer.startField(fields(i).name, i)
216216
writeValue(fields(i).dataType, struct(i))
217217
writer.endField(fields(i).name, i)

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ case class AllDataTypesWithNonPrimitiveType(
7878
booleanField: Boolean,
7979
array: Seq[Int],
8080
map: Map[Int, String],
81-
nested: Nested)
81+
data: Data)
8282

8383
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
8484
TestData // Load test data tables.
@@ -138,7 +138,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
138138
TestSQLContext.sparkContext.parallelize(range)
139139
.map(x => AllDataTypesWithNonPrimitiveType(
140140
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
141-
Seq(x), Map(x -> s"$x"), Nested(x, s"$x")))
141+
(0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
142142
.saveAsParquetFile(tempDir)
143143
val result = parquetFile(tempDir).collect()
144144
range.foreach {
@@ -151,9 +151,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
151151
assert(result(i).getShort(5) === i.toShort)
152152
assert(result(i).getByte(6) === i.toByte)
153153
assert(result(i).getBoolean(7) === (i % 2 == 0))
154-
assert(result(i)(8) === Seq(i))
155-
assert(result(i)(9) === Map(i -> s"$i"))
156-
assert(result(i)(10) === new GenericRow(Array[Any](i, s"$i")))
154+
assert(result(i)(8) === (0 until i))
155+
assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
156+
assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
157157
}
158158
}
159159

0 commit comments

Comments
 (0)