Skip to content

Commit b90c020

Browse files
sameeragarwalyhuai
authored andcommitted
[SPARK-13922][SQL] Filter rows with null attributes in vectorized parquet reader
# What changes were proposed in this pull request? It's common for many SQL operators to not care about reading `null` values for correctness. Currently, this is achieved by performing `isNotNull` checks (for all relevant columns) on a per-row basis. Pushing these null filters in the vectorized parquet reader should bring considerable benefits (especially for cases when the underlying data doesn't contain any nulls or contains all nulls). ## How was this patch tested? Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz String with Nulls Scan (0%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 1229 / 1648 8.5 117.2 1.0X PR Vectorized 833 / 846 12.6 79.4 1.5X PR Vectorized (Null Filtering) 732 / 782 14.3 69.8 1.7X Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz String with Nulls Scan (50%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 995 / 1053 10.5 94.9 1.0X PR Vectorized 732 / 772 14.3 69.8 1.4X PR Vectorized (Null Filtering) 725 / 790 14.5 69.1 1.4X Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz String with Nulls Scan (95%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 326 / 333 32.2 31.1 1.0X PR Vectorized 190 / 200 55.1 18.2 1.7X PR Vectorized (Null Filtering) 168 / 172 62.2 16.1 1.9X Author: Sameer Agarwal <[email protected]> Closes #11749 from sameeragarwal/perf-testing.
1 parent 4ce2d24 commit b90c020

File tree

3 files changed

+146
-5
lines changed

3 files changed

+146
-5
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.execution.vectorized;
1818

19-
import java.util.Arrays;
20-
import java.util.Iterator;
21-
import java.util.NoSuchElementException;
19+
import java.util.*;
2220

2321
import org.apache.commons.lang.NotImplementedException;
2422

@@ -58,6 +56,9 @@ public final class ColumnarBatch {
5856
// True if the row is filtered.
5957
private final boolean[] filteredRows;
6058

59+
// Column indices that cannot have null values.
60+
private final Set<Integer> nullFilteredColumns;
61+
6162
// Total number of rows that have been filtered.
6263
private int numRowsFiltered = 0;
6364

@@ -284,11 +285,23 @@ public void reset() {
284285
}
285286

286287
/**
287-
* Sets the number of rows that are valid.
288+
* Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or
289+
* more of their attributes are part of a non-nullable column.
288290
*/
289291
public void setNumRows(int numRows) {
290292
assert(numRows <= this.capacity);
291293
this.numRows = numRows;
294+
295+
for (int ordinal : nullFilteredColumns) {
296+
if (columns[ordinal].numNulls != 0) {
297+
for (int rowId = 0; rowId < numRows; rowId++) {
298+
if (!filteredRows[rowId] && columns[ordinal].getIsNull(rowId)) {
299+
filteredRows[rowId] = true;
300+
++numRowsFiltered;
301+
}
302+
}
303+
}
304+
}
292305
}
293306

294307
/**
@@ -345,15 +358,24 @@ public ColumnarBatch.Row getRow(int rowId) {
345358
* in this batch will not include this row.
346359
*/
347360
public final void markFiltered(int rowId) {
348-
assert(filteredRows[rowId] == false);
361+
assert(!filteredRows[rowId]);
349362
filteredRows[rowId] = true;
350363
++numRowsFiltered;
351364
}
352365

366+
/**
367+
* Marks a given column as non-nullable. Any row that has a NULL value for the corresponding
368+
* attribute is filtered out.
369+
*/
370+
public final void filterNullsInColumn(int ordinal) {
371+
nullFilteredColumns.add(ordinal);
372+
}
373+
353374
private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
354375
this.schema = schema;
355376
this.capacity = maxRows;
356377
this.columns = new ColumnVector[schema.size()];
378+
this.nullFilteredColumns = new HashSet<>();
357379
this.filteredRows = new boolean[maxRows];
358380

359381
for (int i = 0; i < schema.fields().length; ++i) {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,100 @@ object ParquetReadBenchmark {
299299
}
300300
}
301301

302+
def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
303+
withTempPath { dir =>
304+
withTempTable("t1", "tempTable") {
305+
sqlContext.range(values).registerTempTable("t1")
306+
sqlContext.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " +
307+
s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1")
308+
.write.parquet(dir.getCanonicalPath)
309+
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
310+
311+
val benchmark = new Benchmark("String with Nulls Scan", values)
312+
313+
benchmark.addCase("SQL Parquet Vectorized") { iter =>
314+
sqlContext.sql("select sum(length(c2)) from tempTable where c1 is " +
315+
"not NULL and c2 is not NULL").collect()
316+
}
317+
318+
val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
319+
benchmark.addCase("PR Vectorized") { num =>
320+
var sum = 0
321+
files.map(_.asInstanceOf[String]).foreach { p =>
322+
val reader = new UnsafeRowParquetRecordReader
323+
try {
324+
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
325+
val batch = reader.resultBatch()
326+
while (reader.nextBatch()) {
327+
val rowIterator = batch.rowIterator()
328+
while (rowIterator.hasNext) {
329+
val row = rowIterator.next()
330+
val value = row.getUTF8String(0)
331+
if (!row.isNullAt(0) && !row.isNullAt(1)) sum += value.numBytes()
332+
}
333+
}
334+
} finally {
335+
reader.close()
336+
}
337+
}
338+
}
339+
340+
benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
341+
var sum = 0L
342+
files.map(_.asInstanceOf[String]).foreach { p =>
343+
val reader = new UnsafeRowParquetRecordReader
344+
try {
345+
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
346+
val batch = reader.resultBatch()
347+
batch.filterNullsInColumn(0)
348+
batch.filterNullsInColumn(1)
349+
while (reader.nextBatch()) {
350+
val rowIterator = batch.rowIterator()
351+
while (rowIterator.hasNext) {
352+
sum += rowIterator.next().getUTF8String(0).numBytes()
353+
}
354+
}
355+
} finally {
356+
reader.close()
357+
}
358+
}
359+
}
360+
361+
/*
362+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
363+
String with Nulls Scan (0%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
364+
-------------------------------------------------------------------------------------------
365+
SQL Parquet Vectorized 1229 / 1648 8.5 117.2 1.0X
366+
PR Vectorized 833 / 846 12.6 79.4 1.5X
367+
PR Vectorized (Null Filtering) 732 / 782 14.3 69.8 1.7X
368+
369+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
370+
String with Nulls Scan (50%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
371+
-------------------------------------------------------------------------------------------
372+
SQL Parquet Vectorized 995 / 1053 10.5 94.9 1.0X
373+
PR Vectorized 732 / 772 14.3 69.8 1.4X
374+
PR Vectorized (Null Filtering) 725 / 790 14.5 69.1 1.4X
375+
376+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
377+
String with Nulls Scan (95%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
378+
-------------------------------------------------------------------------------------------
379+
SQL Parquet Vectorized 326 / 333 32.2 31.1 1.0X
380+
PR Vectorized 190 / 200 55.1 18.2 1.7X
381+
PR Vectorized (Null Filtering) 168 / 172 62.2 16.1 1.9X
382+
*/
383+
384+
benchmark.run()
385+
}
386+
}
387+
}
388+
302389
def main(args: Array[String]): Unit = {
303390
intScanBenchmark(1024 * 1024 * 15)
304391
intStringScanBenchmark(1024 * 1024 * 10)
305392
stringDictionaryScanBenchmark(1024 * 1024 * 10)
306393
partitionTableScanBenchmark(1024 * 1024 * 15)
394+
for (fractionOfNulls <- List(0.0, 0.50, 0.95)) {
395+
stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls)
396+
}
307397
}
308398
}

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,4 +727,33 @@ class ColumnarBatchSuite extends SparkFunSuite {
727727
test("Random nested schema") {
728728
testRandomRows(false, 30)
729729
}
730+
731+
test("null filtered columns") {
732+
val NUM_ROWS = 10
733+
val schema = new StructType()
734+
.add("key", IntegerType, nullable = false)
735+
.add("value", StringType, nullable = true)
736+
for (numNulls <- List(0, NUM_ROWS / 2, NUM_ROWS)) {
737+
val rows = mutable.ArrayBuffer.empty[Row]
738+
for (i <- 0 until NUM_ROWS) {
739+
val row = if (i < numNulls) Row.fromSeq(Seq(i, null)) else Row.fromSeq(Seq(i, i.toString))
740+
rows += row
741+
}
742+
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
743+
val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
744+
batch.filterNullsInColumn(1)
745+
batch.setNumRows(NUM_ROWS)
746+
assert(batch.numRows() == NUM_ROWS)
747+
val it = batch.rowIterator()
748+
// Top numNulls rows should be filtered
749+
var k = numNulls
750+
while (it.hasNext) {
751+
assert(it.next().getInt(0) == k)
752+
k += 1
753+
}
754+
assert(k == NUM_ROWS)
755+
batch.close()
756+
}}
757+
}
758+
}
730759
}

0 commit comments

Comments
 (0)