Skip to content

Commit a418548

Browse files
tinhto-000cloud-fan
authored andcommitted
[SPARK-31703][SQL] Parquet RLE float/double are read incorrectly on big endian platforms
### What changes were proposed in this pull request? This PR fixes the issue introduced during SPARK-26985. SPARK-26985 changes the `putDoubles()` and `putFloats()` methods to respect the platform's endian-ness. However, that causes the RLE paths in VectorizedRleValuesReader.java to read the RLE entries in parquet as BIG_ENDIAN on big endian platforms (i.e., as is), even though parquet data is always in little endian format. The comments in `WriteableColumnVector.java` say those methods are used for "ieee formatted doubles in platform native endian" (or floats), but since the data in parquet is always in little endian format, use of those methods appears to be inappropriate. To demonstrate the problem with spark-shell: ```scala import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ var data = Seq( (1.0, 0.1), (2.0, 0.2), (0.3, 3.0), (4.0, 4.0), (5.0, 5.0)) var df = spark.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet("/tmp/data.parquet2") var df2 = spark.read.parquet("/tmp/data.parquet2") df2.show() ``` result: ```scala +--------------------+--------------------+ | _1| _2| +--------------------+--------------------+ | 3.16E-322|-1.54234871366845...| | 2.0553E-320| 2.0553E-320| | 2.561E-320| 2.561E-320| |4.66726145843124E-62| 1.0435E-320| | 3.03865E-319|-1.54234871366757...| +--------------------+--------------------+ ``` Also tests in ParquetIOSuite that involve float/double data would fail, e.g., - basic data types (without binary) - read raw Parquet file /examples/src/main/python/mllib/isotonic_regression_example.py would fail as well. Purposed code change is to add `putDoublesLittleEndian()` and `putFloatsLittleEndian()` methods for parquet to invoke, just like the existing `putIntsLittleEndian()` and `putLongsLittleEndian()`. On little endian platforms they would call `putDoubles()` and `putFloats()`, on big endian they would read the entries as little endian like pre-SPARK-26985. No new unit-test is introduced as the existing ones are actually sufficient. ### Why are the changes needed? RLE float/double data in parquet files will not be read back correctly on big endian platforms. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? All unit tests (mvn test) were ran and OK. Closes #29383 from tinhto-000/SPARK-31703. Authored-by: Tin Hang To <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4cf8c1d commit a418548

File tree

5 files changed

+166
-16
lines changed

5 files changed

+166
-16
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public final void readFloats(int total, WritableColumnVector c, int rowId) {
169169

170170
if (buffer.hasArray()) {
171171
int offset = buffer.arrayOffset() + buffer.position();
172-
c.putFloats(rowId, total, buffer.array(), offset);
172+
c.putFloatsLittleEndian(rowId, total, buffer.array(), offset);
173173
} else {
174174
for (int i = 0; i < total; i += 1) {
175175
c.putFloat(rowId + i, buffer.getFloat());
@@ -184,7 +184,7 @@ public final void readDoubles(int total, WritableColumnVector c, int rowId) {
184184

185185
if (buffer.hasArray()) {
186186
int offset = buffer.arrayOffset() + buffer.position();
187-
c.putDoubles(rowId, total, buffer.array(), offset);
187+
c.putDoublesLittleEndian(rowId, total, buffer.array(), offset);
188188
} else {
189189
for (int i = 0; i < total; i += 1) {
190190
c.putDouble(rowId + i, buffer.getDouble());

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,19 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
425425
}
426426
}
427427

428+
@Override
429+
public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
430+
if (!bigEndianPlatform) {
431+
putFloats(rowId, count, src, srcIndex);
432+
} else {
433+
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
434+
long offset = data + 4L * rowId;
435+
for (int i = 0; i < count; ++i, offset += 4) {
436+
Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
437+
}
438+
}
439+
}
440+
428441
@Override
429442
public float getFloat(int rowId) {
430443
if (dictionary == null) {
@@ -480,6 +493,19 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
480493
}
481494
}
482495

496+
@Override
497+
public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
498+
if (!bigEndianPlatform) {
499+
putDoubles(rowId, count, src, srcIndex);
500+
} else {
501+
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
502+
long offset = data + 8L * rowId;
503+
for (int i = 0; i < count; ++i, offset += 8) {
504+
Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
505+
}
506+
}
507+
}
508+
483509
@Override
484510
public double getDouble(int rowId) {
485511
if (dictionary == null) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,18 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
403403
}
404404
}
405405

406+
@Override
407+
public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
408+
if (!bigEndianPlatform) {
409+
putFloats(rowId, count, src, srcIndex);
410+
} else {
411+
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
412+
for (int i = 0; i < count; ++i) {
413+
floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
414+
}
415+
}
416+
}
417+
406418
@Override
407419
public float getFloat(int rowId) {
408420
if (dictionary == null) {
@@ -452,6 +464,18 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
452464
}
453465
}
454466

467+
@Override
468+
public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
469+
if (!bigEndianPlatform) {
470+
putDoubles(rowId, count, src, srcIndex);
471+
} else {
472+
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
473+
for (int i = 0; i < count; ++i) {
474+
doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
475+
}
476+
}
477+
}
478+
455479
@Override
456480
public double getDouble(int rowId) {
457481
if (dictionary == null) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) {
312312
*/
313313
public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex);
314314

315+
/**
316+
* Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count)
317+
* The data in src must be ieee formatted floats in little endian.
318+
*/
319+
public abstract void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
320+
315321
/**
316322
* Sets `value` to the value at rowId.
317323
*/
@@ -333,6 +339,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) {
333339
*/
334340
public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
335341

342+
/**
343+
* Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, rowId + count)
344+
* The data in src must be ieee formatted doubles in little endian.
345+
*/
346+
public abstract void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex);
347+
336348
/**
337349
* Puts a byte array that already exists in this column.
338350
*/

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala

Lines changed: 102 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -447,13 +447,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
447447
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f)
448448
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f)
449449

450-
if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
451-
// Ensure array contains Little Endian floats
452-
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
453-
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0))
454-
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4))
455-
}
456-
457450
column.putFloats(idx, 1, buffer, 4)
458451
column.putFloats(idx + 1, 1, buffer, 0)
459452
reference += 1.123f
@@ -491,6 +484,57 @@ class ColumnarBatchSuite extends SparkFunSuite {
491484
}
492485
}
493486

487+
testVector("[SPARK-31703] Float API - Little Endian", 1024, FloatType) {
488+
column =>
489+
val seed = System.currentTimeMillis()
490+
val random = new Random(seed)
491+
val reference = mutable.ArrayBuffer.empty[Float]
492+
493+
var idx = 0
494+
495+
val littleEndian = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
496+
littleEndian.putFloat(0, 1.357f)
497+
littleEndian.putFloat(4, 2.468f)
498+
val arr = new Array[Byte](littleEndian.remaining)
499+
littleEndian.get(arr)
500+
501+
column.putFloatsLittleEndian(idx, 1, arr, 4)
502+
column.putFloatsLittleEndian(idx + 1, 1, arr, 0)
503+
reference += 2.468f
504+
reference += 1.357f
505+
idx += 2
506+
507+
column.putFloatsLittleEndian(idx, 2, arr, 0)
508+
reference += 1.357f
509+
reference += 2.468f
510+
idx += 2
511+
512+
while (idx < column.capacity) {
513+
val single = random.nextBoolean()
514+
if (single) {
515+
val v = random.nextFloat()
516+
column.putFloat(idx, v)
517+
reference += v
518+
idx += 1
519+
} else {
520+
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
521+
val v = random.nextFloat()
522+
column.putFloats(idx, n, v)
523+
var i = 0
524+
while (i < n) {
525+
reference += v
526+
i += 1
527+
}
528+
idx += n
529+
}
530+
}
531+
532+
reference.zipWithIndex.foreach { v =>
533+
assert(v._1 == column.getFloat(v._2),
534+
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
535+
}
536+
}
537+
494538
testVector("Double APIs", 1024, DoubleType) {
495539
column =>
496540
val seed = System.currentTimeMillis()
@@ -531,13 +575,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
531575
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
532576
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
533577

534-
if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
535-
// Ensure array contains Little Endian doubles
536-
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
537-
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
538-
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
539-
}
540-
541578
column.putDoubles(idx, 1, buffer, 8)
542579
column.putDoubles(idx + 1, 1, buffer, 0)
543580
reference += 1.123
@@ -575,6 +612,57 @@ class ColumnarBatchSuite extends SparkFunSuite {
575612
}
576613
}
577614

615+
testVector("[SPARK-31703] Double API - Little Endian", 1024, DoubleType) {
616+
column =>
617+
val seed = System.currentTimeMillis()
618+
val random = new Random(seed)
619+
val reference = mutable.ArrayBuffer.empty[Double]
620+
621+
var idx = 0
622+
623+
val littleEndian = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN)
624+
littleEndian.putDouble(0, 1.357)
625+
littleEndian.putDouble(8, 2.468)
626+
val arr = new Array[Byte](littleEndian.remaining)
627+
littleEndian.get(arr)
628+
629+
column.putDoublesLittleEndian(idx, 1, arr, 8)
630+
column.putDoublesLittleEndian(idx + 1, 1, arr, 0)
631+
reference += 2.468
632+
reference += 1.357
633+
idx += 2
634+
635+
column.putDoublesLittleEndian(idx, 2, arr, 0)
636+
reference += 1.357
637+
reference += 2.468
638+
idx += 2
639+
640+
while (idx < column.capacity) {
641+
val single = random.nextBoolean()
642+
if (single) {
643+
val v = random.nextDouble()
644+
column.putDouble(idx, v)
645+
reference += v
646+
idx += 1
647+
} else {
648+
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
649+
val v = random.nextDouble()
650+
column.putDoubles(idx, n, v)
651+
var i = 0
652+
while (i < n) {
653+
reference += v
654+
i += 1
655+
}
656+
idx += n
657+
}
658+
}
659+
660+
reference.zipWithIndex.foreach { v =>
661+
assert(v._1 == column.getDouble(v._2),
662+
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
663+
}
664+
}
665+
578666
testVector("String APIs", 7, StringType) {
579667
column =>
580668
val reference = mutable.ArrayBuffer.empty[String]

0 commit comments

Comments
 (0)