Skip to content

Commit 9fe693b

Browse files
ueshinmarmbrus
authored andcommitted
[SPARK-2446][SQL] Add BinaryType support to Parquet I/O.
Note that this commit changes the semantics when loading in data that was created with prior versions of Spark SQL. Before, we were writing out strings as Binary data without adding any other annotations. Thus, when data is read in from prior versions, data that was StringType will now become BinaryType. Users that need strings can CAST that column to a String. It was decided that while this breaks compatibility, it does make us compatible with other systems (Hive, Thrift, etc) and adds support for Binary data, so this is the right decision long term. To support `BinaryType`, the following changes are needed: - Make `StringType` use `OriginalType.UTF8` - Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType` Author: Takuya UESHIN <[email protected]> Closes apache#1373 from ueshin/issues/SPARK-2446 and squashes the following commits: ecacb92 [Takuya UESHIN] Add BinaryType support to Parquet I/O. 616e04a [Takuya UESHIN] Make StringType use OriginalType.UTF8.
1 parent 3dd8af7 commit 9fe693b

File tree

5 files changed

+57
-45
lines changed

5 files changed

+57
-45
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private[sql] object CatalystConverter {
114114
}
115115
}
116116
// All other primitive types use the default converter
117-
case ctype: NativeType => { // note: need the type tag here!
117+
case ctype: PrimitiveType => { // note: need the type tag here!
118118
new CatalystPrimitiveConverter(parent, fieldIndex)
119119
}
120120
case _ => throw new RuntimeException(

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
191191
value.asInstanceOf[String].getBytes("utf-8")
192192
)
193193
)
194+
case BinaryType => writer.addBinary(
195+
Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
194196
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
195197
case ShortType => writer.addInteger(value.asInstanceOf[Short])
196198
case LongType => writer.addLong(value.asInstanceOf[Long])
@@ -299,6 +301,8 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
299301
record(index).asInstanceOf[String].getBytes("utf-8")
300302
)
301303
)
304+
case BinaryType => writer.addBinary(
305+
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
302306
case IntegerType => writer.addInteger(record.getInt(index))
303307
case ShortType => writer.addInteger(record.getShort(index))
304308
case LongType => writer.addLong(record.getLong(index))

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[sql] object ParquetTestData {
5858
"""message myrecord {
5959
optional boolean myboolean;
6060
optional int32 myint;
61-
optional binary mystring;
61+
optional binary mystring (UTF8);
6262
optional int64 mylong;
6363
optional float myfloat;
6464
optional double mydouble;
@@ -87,7 +87,7 @@ private[sql] object ParquetTestData {
8787
message myrecord {
8888
required boolean myboolean;
8989
required int32 myint;
90-
required binary mystring;
90+
required binary mystring (UTF8);
9191
required int64 mylong;
9292
required float myfloat;
9393
required double mydouble;
@@ -119,14 +119,14 @@ private[sql] object ParquetTestData {
119119
// so that array types can be translated correctly.
120120
"""
121121
message AddressBook {
122-
required binary owner;
122+
required binary owner (UTF8);
123123
optional group ownerPhoneNumbers {
124-
repeated binary array;
124+
repeated binary array (UTF8);
125125
}
126126
optional group contacts {
127127
repeated group array {
128-
required binary name;
129-
optional binary phoneNumber;
128+
required binary name (UTF8);
129+
optional binary phoneNumber (UTF8);
130130
}
131131
}
132132
}
@@ -181,16 +181,16 @@ private[sql] object ParquetTestData {
181181
required int32 x;
182182
optional group data1 {
183183
repeated group map {
184-
required binary key;
184+
required binary key (UTF8);
185185
required int32 value;
186186
}
187187
}
188188
required group data2 {
189189
repeated group map {
190-
required binary key;
190+
required binary key (UTF8);
191191
required group value {
192192
required int64 payload1;
193-
optional binary payload2;
193+
optional binary payload2 (UTF8);
194194
}
195195
}
196196
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
4242
def isPrimitiveType(ctype: DataType): Boolean =
4343
classOf[PrimitiveType] isAssignableFrom ctype.getClass
4444

45-
def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
46-
case ParquetPrimitiveTypeName.BINARY => StringType
47-
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
48-
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
49-
case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType)
50-
case ParquetPrimitiveTypeName.FLOAT => FloatType
51-
case ParquetPrimitiveTypeName.INT32 => IntegerType
52-
case ParquetPrimitiveTypeName.INT64 => LongType
53-
case ParquetPrimitiveTypeName.INT96 =>
54-
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
55-
sys.error("Potential loss of precision: cannot convert INT96")
56-
case _ => sys.error(
57-
s"Unsupported parquet datatype $parquetType")
58-
}
45+
def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
46+
parquetType.getPrimitiveTypeName match {
47+
case ParquetPrimitiveTypeName.BINARY
48+
if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
49+
case ParquetPrimitiveTypeName.BINARY => BinaryType
50+
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
51+
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
52+
case ParquetPrimitiveTypeName.FLOAT => FloatType
53+
case ParquetPrimitiveTypeName.INT32 => IntegerType
54+
case ParquetPrimitiveTypeName.INT64 => LongType
55+
case ParquetPrimitiveTypeName.INT96 =>
56+
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
57+
sys.error("Potential loss of precision: cannot convert INT96")
58+
case _ => sys.error(
59+
s"Unsupported parquet datatype $parquetType")
60+
}
5961

6062
/**
6163
* Converts a given Parquet `Type` into the corresponding
@@ -104,7 +106,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
104106
}
105107

106108
if (parquetType.isPrimitive) {
107-
toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
109+
toPrimitiveDataType(parquetType.asPrimitiveType)
108110
} else {
109111
val groupType = parquetType.asGroupType()
110112
parquetType.getOriginalType match {
@@ -164,18 +166,17 @@ private[parquet] object ParquetTypesConverter extends Logging {
164166
* @return The name of the corresponding Parquet primitive type
165167
*/
166168
def fromPrimitiveDataType(ctype: DataType):
167-
Option[ParquetPrimitiveTypeName] = ctype match {
168-
case StringType => Some(ParquetPrimitiveTypeName.BINARY)
169-
case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN)
170-
case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE)
171-
case ArrayType(ByteType) =>
172-
Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
173-
case FloatType => Some(ParquetPrimitiveTypeName.FLOAT)
174-
case IntegerType => Some(ParquetPrimitiveTypeName.INT32)
169+
Option[(ParquetPrimitiveTypeName, Option[ParquetOriginalType])] = ctype match {
170+
case StringType => Some(ParquetPrimitiveTypeName.BINARY, Some(ParquetOriginalType.UTF8))
171+
case BinaryType => Some(ParquetPrimitiveTypeName.BINARY, None)
172+
case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN, None)
173+
case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE, None)
174+
case FloatType => Some(ParquetPrimitiveTypeName.FLOAT, None)
175+
case IntegerType => Some(ParquetPrimitiveTypeName.INT32, None)
175176
// There is no type for Byte or Short so we promote them to INT32.
176-
case ShortType => Some(ParquetPrimitiveTypeName.INT32)
177-
case ByteType => Some(ParquetPrimitiveTypeName.INT32)
178-
case LongType => Some(ParquetPrimitiveTypeName.INT64)
177+
case ShortType => Some(ParquetPrimitiveTypeName.INT32, None)
178+
case ByteType => Some(ParquetPrimitiveTypeName.INT32, None)
179+
case LongType => Some(ParquetPrimitiveTypeName.INT64, None)
179180
case _ => None
180181
}
181182

@@ -227,17 +228,18 @@ private[parquet] object ParquetTypesConverter extends Logging {
227228
if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
228229
}
229230
val primitiveType = fromPrimitiveDataType(ctype)
230-
if (primitiveType.isDefined) {
231-
new ParquetPrimitiveType(repetition, primitiveType.get, name)
232-
} else {
231+
primitiveType.map {
232+
case (primitiveType, originalType) =>
233+
new ParquetPrimitiveType(repetition, primitiveType, name, originalType.orNull)
234+
}.getOrElse {
233235
ctype match {
234236
case ArrayType(elementType) => {
235237
val parquetElementType = fromDataType(
236238
elementType,
237239
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
238240
nullable = false,
239241
inArray = true)
240-
ConversionPatterns.listType(repetition, name, parquetElementType)
242+
ConversionPatterns.listType(repetition, name, parquetElementType)
241243
}
242244
case StructType(structFields) => {
243245
val fields = structFields.map {

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ case class AllDataTypes(
6565
doubleField: Double,
6666
shortField: Short,
6767
byteField: Byte,
68-
booleanField: Boolean)
68+
booleanField: Boolean,
69+
binaryField: Array[Byte])
6970

7071
case class AllDataTypesWithNonPrimitiveType(
7172
stringField: String,
@@ -76,6 +77,7 @@ case class AllDataTypesWithNonPrimitiveType(
7677
shortField: Short,
7778
byteField: Byte,
7879
booleanField: Boolean,
80+
binaryField: Array[Byte],
7981
array: Seq[Int],
8082
map: Map[Int, String],
8183
data: Data)
@@ -116,7 +118,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
116118
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
117119
val range = (0 to 255)
118120
TestSQLContext.sparkContext.parallelize(range)
119-
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))
121+
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
122+
(0 to x).map(_.toByte).toArray))
120123
.saveAsParquetFile(tempDir)
121124
val result = parquetFile(tempDir).collect()
122125
range.foreach {
@@ -129,6 +132,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
129132
assert(result(i).getShort(5) === i.toShort)
130133
assert(result(i).getByte(6) === i.toByte)
131134
assert(result(i).getBoolean(7) === (i % 2 == 0))
135+
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
132136
}
133137
}
134138

@@ -138,6 +142,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
138142
TestSQLContext.sparkContext.parallelize(range)
139143
.map(x => AllDataTypesWithNonPrimitiveType(
140144
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
145+
(0 to x).map(_.toByte).toArray,
141146
(0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
142147
.saveAsParquetFile(tempDir)
143148
val result = parquetFile(tempDir).collect()
@@ -151,9 +156,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
151156
assert(result(i).getShort(5) === i.toShort)
152157
assert(result(i).getByte(6) === i.toByte)
153158
assert(result(i).getBoolean(7) === (i % 2 == 0))
154-
assert(result(i)(8) === (0 until i))
155-
assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
156-
assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
159+
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
160+
assert(result(i)(9) === (0 until i))
161+
assert(result(i)(10) === (0 until i).map(i => i -> s"$i").toMap)
162+
assert(result(i)(11) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
157163
}
158164
}
159165

0 commit comments

Comments
 (0)