Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ private[sql] object CatalystConverter {
parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
}
}
case DateType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
}
}
case d: DecimalType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
Expand Down Expand Up @@ -192,6 +198,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)

protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)

protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
updateField(fieldIndex, value)

Expand Down Expand Up @@ -388,6 +397,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
current.setInt(fieldIndex, value)

override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
current.update(fieldIndex, value)

override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
case DateType => writer.addInteger(value.asInstanceOf[Int])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
Expand Down Expand Up @@ -358,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
case DateType => writer.addInteger(record.getInt(index))
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
case ParquetPrimitiveTypeName.FLOAT => FloatType
case ParquetPrimitiveTypeName.INT32
if originalType == ParquetOriginalType.DATE => DateType
case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
Expand Down Expand Up @@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
// There is no type for Byte or Short so we promote them to INT32.
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case DateType => Some(ParquetTypeInfo(
ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
case DecimalType.Fixed(precision, scale) if precision <= 18 =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}

test("date type") {
def makeDateRDD(): DataFrame =
sparkContext
.parallelize(0 to 1000)
.map(i => Tuple1(DateUtils.toJavaDate(i)))
.toDF()
.select($"_1")

withTempPath { dir =>
val data = makeDateRDD()
data.saveAsParquetFile(dir.getCanonicalPath)
checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
}
}

test("map") {
val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
checkParquetFile(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
|}
""".stripMargin)

testSchema[(Byte, Short, Int, Long)](
testSchema[(Byte, Short, Int, Long, java.sql.Date)](
"logical integral types",
"""
|message root {
| required int32 _1 (INT_8);
| required int32 _2 (INT_16);
| required int32 _3 (INT_32);
| required int64 _4 (INT_64);
| optional int32 _5 (DATE);
|}
""".stripMargin)

Expand Down