Skip to content

Commit dc8b489

Browse files
committed
Add int->double + more tests
1 parent cb1487e commit dc8b489

File tree

3 files changed

+51
-5
lines changed

3 files changed

+51
-5
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
8989
return new ByteUpdater();
9090
} else if (sparkType == DataTypes.ShortType) {
9191
return new ShortUpdater();
92+
} else if (sparkType == DataTypes.DoubleType) {
93+
return new IntegerToDoubleUpdater();
9294
} else if (sparkType == DataTypes.DateType) {
9395
if ("CORRECTED".equals(datetimeRebaseMode)) {
9496
return new IntegerUpdater();
@@ -331,6 +333,41 @@ public void decodeSingleDictionaryId(
331333
}
332334
}
333335

336+
static class IntegerToDoubleUpdater implements ParquetVectorUpdater {
337+
@Override
338+
public void readValues(
339+
int total,
340+
int offset,
341+
WritableColumnVector values,
342+
VectorizedValuesReader valuesReader) {
343+
for (int i = 0; i < total; ++i) {
344+
values.putDouble(offset + i, valuesReader.readInteger());
345+
}
346+
}
347+
348+
@Override
349+
public void skipValues(int total, VectorizedValuesReader valuesReader) {
350+
valuesReader.skipIntegers(total);
351+
}
352+
353+
@Override
354+
public void readValue(
355+
int offset,
356+
WritableColumnVector values,
357+
VectorizedValuesReader valuesReader) {
358+
values.putDouble(offset, valuesReader.readInteger());
359+
}
360+
361+
@Override
362+
public void decodeSingleDictionaryId(
363+
int offset,
364+
WritableColumnVector values,
365+
WritableColumnVector dictionaryIds,
366+
Dictionary dictionary) {
367+
values.putDouble(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
368+
}
369+
}
370+
334371
static class DateToTimestampNTZUpdater implements ParquetVectorUpdater {
335372
@Override
336373
public void readValues(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,11 @@ private[parquet] class ParquetRowConverter(
318318
override def addInt(value: Int): Unit =
319319
this.updater.setLong(value)
320320
}
321+
case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
322+
new ParquetPrimitiveConverter(updater) {
323+
override def addInt(value: Int): Unit =
324+
this.updater.setDouble(value)
325+
}
321326
case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == FLOAT =>
322327
new ParquetPrimitiveConverter(updater) {
323328
override def addFloat(value: Float): Unit =

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,11 @@ class ParquetTypeWideningSuite
145145
// Int->Short isn't a widening conversion but Parquet stores both as INT32 so it just works.
146146
(Seq("1", "2", Short.MinValue.toString), IntegerType, ShortType),
147147
(Seq("1", "2", Int.MinValue.toString), IntegerType, LongType),
148-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampNTZType),
149-
(Seq("1.23", "10.34"), FloatType, DoubleType))
148+
(Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType),
149+
(Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType),
150+
(Seq("1.23", "10.34"), FloatType, DoubleType),
151+
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampNTZType)
152+
)
150153
}
151154
test(s"parquet widening conversion $fromType -> $toType") {
152155
checkAllParquetReaders(values, fromType, toType, expectError = false)
@@ -155,9 +158,10 @@ class ParquetTypeWideningSuite
155158
for {
156159
(values: Seq[String], fromType: DataType, toType: DataType) <- Seq(
157160
(Seq("1", "2", Int.MinValue.toString), LongType, IntegerType),
158-
// Test different timestamp types
159-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType),
160-
(Seq("1.23", "10.34"), DoubleType, FloatType))
161+
(Seq("1.23", "10.34"), DoubleType, FloatType),
162+
(Seq("1.23", "10.34"), FloatType, LongType),
163+
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)
164+
)
161165
}
162166
test(s"unsupported parquet conversion $fromType -> $toType") {
163167
checkAllParquetReaders(values, fromType, toType, expectError = true)

0 commit comments

Comments
 (0)