From 799f42957759cb923f16a57347847ce97d5216cc Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 26 Dec 2018 20:15:18 +0800 Subject: [PATCH 1/6] refactor --- .../orc/OrcColumnarBatchReader.java | 81 ++++++++++--------- .../datasources/orc/OrcFileFormat.scala | 12 +-- 2 files changed, 51 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index a0d9578a377b..684e543fecdb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -58,10 +58,16 @@ public class OrcColumnarBatchReader extends RecordReader { /** * The column IDs of the physical ORC file schema which are required by this reader. - * -1 means this required column doesn't exist in the ORC file. + * -1 means this required column is partition column, or it doesn't exist in the ORC file. */ private int[] requestedColIds; + /** + * The column IDs of the ORC file partition schema which are required by this reader. + * -1 means this required column doesn't exist in the ORC partition columns. + */ + private int[] requestedPartitionColIds; + // Record reader from ORC row batch. private org.apache.orc.RecordReader recordReader; @@ -143,25 +149,34 @@ public void initialize( /** * Initialize columnar batch by setting required schema and partition information. * With this information, this creates ColumnarBatch with the full schema. + * + * @param orcSchema Schema from ORC file reader. + * @param requiredFields All the fields that are required to return, including partition fields. + * @param requestedColIds Requested column ids from orcSchema. -1 if not existed. + * @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed. + * @param partitionValues Values of partition columns. */ public void initBatch( TypeDescription orcSchema, - int[] requestedColIds, StructField[] requiredFields, - StructType partitionSchema, + int[] requestedColIds, + int[] requestedPartitionColIds, InternalRow partitionValues) { batch = orcSchema.createRowBatch(capacity); assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. - + assert(requiredFields.length == requestedColIds.length); + assert(requiredFields.length == requestedPartitionColIds.length); + // If a required column is also partition column, use partition value and don't read from file. + for (int i = 0; i < requiredFields.length; i++) { + if (requestedPartitionColIds[i] != -1) { + requestedColIds[i] = -1; + } + } + this.requestedPartitionColIds = requestedPartitionColIds; this.requiredFields = requiredFields; this.requestedColIds = requestedColIds; - assert(requiredFields.length == requestedColIds.length); StructType resultSchema = new StructType(requiredFields); - for (StructField f : partitionSchema.fields()) { - resultSchema = resultSchema.add(f); - } - if (copyToSpark) { if (MEMORY_MODE == MemoryMode.OFF_HEAP) { columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema); @@ -169,22 +184,18 @@ public void initBatch( columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema); } - // Initialize the missing columns once. + // Initialize the partition columns and missing columns once. for (int i = 0; i < requiredFields.length; i++) { - if (requestedColIds[i] == -1) { + if (requestedPartitionColIds[i] != -1) { + ColumnVectorUtils.populate(columnVectors[i], + partitionValues, requestedPartitionColIds[i]); + columnVectors[i].setIsConstant(); + } else if (requestedColIds[i] == -1) { columnVectors[i].putNulls(0, capacity); columnVectors[i].setIsConstant(); } } - if (partitionValues.numFields() > 0) { - int partitionIdx = requiredFields.length; - for (int i = 0; i < partitionValues.numFields(); i++) { - ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i); - columnVectors[i + partitionIdx].setIsConstant(); - } - } - columnarBatch = new ColumnarBatch(columnVectors); } else { // Just wrap the ORC column vector instead of copying it to Spark column vector. @@ -192,26 +203,22 @@ public void initBatch( for (int i = 0; i < requiredFields.length; i++) { DataType dt = requiredFields[i].dataType(); - int colId = requestedColIds[i]; - // Initialize the missing columns once. - if (colId == -1) { - OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); - missingCol.putNulls(0, capacity); - missingCol.setIsConstant(); - orcVectorWrappers[i] = missingCol; - } else { - orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); - } - } - - if (partitionValues.numFields() > 0) { - int partitionIdx = requiredFields.length; - for (int i = 0; i < partitionValues.numFields(); i++) { - DataType dt = partitionSchema.fields()[i].dataType(); + if (requestedPartitionColIds[i] != -1) { OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt); - ColumnVectorUtils.populate(partitionCol, partitionValues, i); + ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]); partitionCol.setIsConstant(); - orcVectorWrappers[partitionIdx + i] = partitionCol; + orcVectorWrappers[i] = partitionCol; + } else { + int colId = requestedColIds[i]; + // Initialize the missing columns once. + if (colId == -1) { + OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); + missingCol.putNulls(0, capacity); + missingCol.setIsConstant(); + orcVectorWrappers[i] = missingCol; + } else { + orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 4574f8247af5..e2ab8010f934 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -186,8 +186,9 @@ class OrcFileFormat if (requestedColIdsOrEmptyFile.isEmpty) { Iterator.empty } else { - val requestedColIds = requestedColIdsOrEmptyFile.get - assert(requestedColIds.length == requiredSchema.length, + val requestedColIds = + requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) + assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, @@ -206,13 +207,14 @@ class OrcFileFormat // after opening a file. val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - + val requestedPartitionColIds = + Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( reader.getSchema, + resultSchema.fields, requestedColIds, - requiredSchema.fields, - partitionSchema, + requestedPartitionColIds, file.partitionValues) iter.asInstanceOf[Iterator[InternalRow]] From 49ae28b1e29570675cfecc019d18428553d336b5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 27 Dec 2018 19:18:48 +0800 Subject: [PATCH 2/6] address comments --- .../orc/OrcColumnarBatchReader.java | 33 +++++++++---------- .../datasources/orc/OrcFileFormat.scala | 8 ++--- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index 684e543fecdb..9535a1fdfdde 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -59,14 +59,12 @@ public class OrcColumnarBatchReader extends RecordReader { /** * The column IDs of the physical ORC file schema which are required by this reader. * -1 means this required column is partition column, or it doesn't exist in the ORC file. + * Ideally partition column should never appear in the physical file, and should only appear + * in the directory name. However, Spark allows partition columns inside physical file, + * but Spark will discard the values from the file, and use the partition value got from + * directory name. The column order will be reserved though. */ - private int[] requestedColIds; - - /** - * The column IDs of the ORC file partition schema which are required by this reader. - * -1 means this required column doesn't exist in the ORC partition columns. - */ - private int[] requestedPartitionColIds; + private int[] requestedDataColIds; // Record reader from ORC row batch. private org.apache.orc.RecordReader recordReader; @@ -152,29 +150,28 @@ public void initialize( * * @param orcSchema Schema from ORC file reader. * @param requiredFields All the fields that are required to return, including partition fields. - * @param requestedColIds Requested column ids from orcSchema. -1 if not existed. + * @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed. * @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed. * @param partitionValues Values of partition columns. */ public void initBatch( TypeDescription orcSchema, StructField[] requiredFields, - int[] requestedColIds, + int[] requestedDataColIds, int[] requestedPartitionColIds, InternalRow partitionValues) { batch = orcSchema.createRowBatch(capacity); assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. - assert(requiredFields.length == requestedColIds.length); + assert(requiredFields.length == requestedDataColIds.length); assert(requiredFields.length == requestedPartitionColIds.length); // If a required column is also partition column, use partition value and don't read from file. for (int i = 0; i < requiredFields.length; i++) { if (requestedPartitionColIds[i] != -1) { - requestedColIds[i] = -1; + requestedDataColIds[i] = -1; } } - this.requestedPartitionColIds = requestedPartitionColIds; this.requiredFields = requiredFields; - this.requestedColIds = requestedColIds; + this.requestedDataColIds = requestedDataColIds; StructType resultSchema = new StructType(requiredFields); if (copyToSpark) { @@ -190,7 +187,7 @@ public void initBatch( ColumnVectorUtils.populate(columnVectors[i], partitionValues, requestedPartitionColIds[i]); columnVectors[i].setIsConstant(); - } else if (requestedColIds[i] == -1) { + } else if (requestedDataColIds[i] == -1) { columnVectors[i].putNulls(0, capacity); columnVectors[i].setIsConstant(); } @@ -209,7 +206,7 @@ public void initBatch( partitionCol.setIsConstant(); orcVectorWrappers[i] = partitionCol; } else { - int colId = requestedColIds[i]; + int colId = requestedDataColIds[i]; // Initialize the missing columns once. if (colId == -1) { OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); @@ -240,7 +237,7 @@ private boolean nextBatch() throws IOException { if (!copyToSpark) { for (int i = 0; i < requiredFields.length; i++) { - if (requestedColIds[i] != -1) { + if (requestedDataColIds[i] != -1) { ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize); } } @@ -255,8 +252,8 @@ private boolean nextBatch() throws IOException { StructField field = requiredFields[i]; WritableColumnVector toColumn = columnVectors[i]; - if (requestedColIds[i] >= 0) { - ColumnVector fromColumn = batch.cols[requestedColIds[i]]; + if (requestedDataColIds[i] >= 0) { + ColumnVector fromColumn = batch.cols[requestedDataColIds[i]]; if (fromColumn.isRepeating) { putRepeatingValues(batchSize, field, fromColumn, toColumn); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index e2ab8010f934..cd10ad21cd82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -186,9 +186,8 @@ class OrcFileFormat if (requestedColIdsOrEmptyFile.isEmpty) { Iterator.empty } else { - val requestedColIds = - requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) - assert(requestedColIds.length == resultSchema.length, + val requestedColIds = requestedColIdsOrEmptyFile.get + assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, @@ -207,13 +206,14 @@ class OrcFileFormat // after opening a file. val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( reader.getSchema, resultSchema.fields, - requestedColIds, + requestedDataColIds, requestedPartitionColIds, file.partitionValues) From a3a5741b1b04272f2ce5e395327402b4c17f3013 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 27 Dec 2018 14:35:16 -0800 Subject: [PATCH 3/6] Add `OrcColumnarBatchReaderSuite` --- .../orc/OrcColumnarBatchReader.java | 7 +- .../orc/OrcColumnarBatchReaderSuite.scala | 91 +++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index 9535a1fdfdde..7dc90df05a8f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.stream.IntStream; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -64,7 +65,8 @@ public class OrcColumnarBatchReader extends RecordReader { * but Spark will discard the values from the file, and use the partition value got from * directory name. The column order will be reserved though. */ - private int[] requestedDataColIds; + @VisibleForTesting + public int[] requestedDataColIds; // Record reader from ORC row batch. private org.apache.orc.RecordReader recordReader; @@ -72,7 +74,8 @@ public class OrcColumnarBatchReader extends RecordReader { private StructField[] requiredFields; // The result columnar batch for vectorized execution by whole-stage codegen. - private ColumnarBatch columnarBatch; + @VisibleForTesting + public ColumnarBatch columnarBatch; // Writable column vectors of the result columnar batch. private WritableColumnVector[] columnVectors; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala new file mode 100644 index 000000000000..6ce085e5f8aa --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.orc.TypeDescription + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String.fromString + +class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext { + + private val orcFileSchema = TypeDescription.fromString(s"struct") + private val requiredSchema = StructType.fromDDL("col1 int, col3 int") + private val partitionSchema = StructType.fromDDL("partCol1 string, partCol2 string") + private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2")) + private val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) + + private val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant") + isConstant.setAccessible(true) + + private def getReader(requestedDataColIds: Array[Int], requestedPartitionColIds: Array[Int]) = { + val reader = new OrcColumnarBatchReader(false, false, 4096) + reader.initBatch( + orcFileSchema, + resultSchema.fields, + requestedDataColIds, + requestedPartitionColIds, + partitionValues) + reader + } + + test("requestedPartitionColIds resets requestedDataColIds - all partitions are requested") { + val requestedDataColIds = Array(0, 1, 0, 0) + val requestedPartitionColIds = Array(-1, -1, 0, 1) + val reader = getReader(requestedDataColIds, requestedPartitionColIds) + assert(reader.requestedDataColIds === Array(0, 1, -1, -1)) + } + + test("requestedPartitionColIds resets requestedDataColIds - one partition is requested") { + Seq((Array(-1, -1, 0, -1), Array(0, 1, -1, 0)), + (Array(-1, -1, -1, 0), Array(0, 1, 0, -1))).foreach { + case (requestedPartitionColIds, answer) => + val requestedDataColIds = Array(0, 1, 0, 0) + val reader = getReader(requestedDataColIds, requestedPartitionColIds) + assert(reader.requestedDataColIds === answer) + } + } + + test("initBatch should initialize requested partition columns only") { + val requestedDataColIds = Array(0, -1, -1, -1) // only `col1` is requested, `col3` doesn't exist + val requestedPartitionColIds = Array(-1, -1, 0, -1) // only `partCol1` is requested + val reader = getReader(requestedDataColIds, requestedPartitionColIds) + val batch = reader.columnarBatch + assert(batch.numCols() === 4) + + assert(batch.column(0).isInstanceOf[OrcColumnVector]) + assert(batch.column(1).isInstanceOf[OnHeapColumnVector]) + assert(batch.column(2).isInstanceOf[OnHeapColumnVector]) + assert(batch.column(3).isInstanceOf[OnHeapColumnVector]) + + val col3 = batch.column(1).asInstanceOf[OnHeapColumnVector] + val partCol1 = batch.column(2).asInstanceOf[OnHeapColumnVector] + val partCol2 = batch.column(3).asInstanceOf[OnHeapColumnVector] + + assert(isConstant.get(col3).asInstanceOf[Boolean]) // `col3` is NULL. + assert(isConstant.get(partCol1).asInstanceOf[Boolean]) // Partition column is constant. + assert(isConstant.get(partCol2).asInstanceOf[Boolean]) // Null column is constant. + + assert(partCol1.getUTF8String(0) === partitionValues.getUTF8String(0)) + assert(partCol2.isNullAt(0)) // This is NULL because it's not requested. + } +} From b87ea1e7ded5bfa4d677c6101bec01b0959c3856 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 28 Dec 2018 15:49:19 +0800 Subject: [PATCH 4/6] update test case --- .../orc/OrcColumnarBatchReaderSuite.scala | 95 ++++++++----------- 1 file changed, 42 insertions(+), 53 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index 6ce085e5f8aa..52abeb20e7f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -23,69 +23,58 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.unsafe.types.UTF8String.fromString class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext { - - private val orcFileSchema = TypeDescription.fromString(s"struct") - private val requiredSchema = StructType.fromDDL("col1 int, col3 int") - private val partitionSchema = StructType.fromDDL("partCol1 string, partCol2 string") + private val dataSchema = StructType.fromDDL("col1 int, col2 int") + private val partitionSchema = StructType.fromDDL("p1 string, p2 string") private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2")) - private val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) - - private val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant") - isConstant.setAccessible(true) + private val orcFileSchemaList = Seq( + "struct", "struct", + "struct", "struct") + orcFileSchemaList.foreach { case schema => + val orcFileSchema = TypeDescription.fromString(schema) - private def getReader(requestedDataColIds: Array[Int], requestedPartitionColIds: Array[Int]) = { - val reader = new OrcColumnarBatchReader(false, false, 4096) - reader.initBatch( - orcFileSchema, - resultSchema.fields, - requestedDataColIds, - requestedPartitionColIds, - partitionValues) - reader - } - - test("requestedPartitionColIds resets requestedDataColIds - all partitions are requested") { - val requestedDataColIds = Array(0, 1, 0, 0) - val requestedPartitionColIds = Array(-1, -1, 0, 1) - val reader = getReader(requestedDataColIds, requestedPartitionColIds) - assert(reader.requestedDataColIds === Array(0, 1, -1, -1)) - } + val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant") + isConstant.setAccessible(true) - test("requestedPartitionColIds resets requestedDataColIds - one partition is requested") { - Seq((Array(-1, -1, 0, -1), Array(0, 1, -1, 0)), - (Array(-1, -1, -1, 0), Array(0, 1, 0, -1))).foreach { - case (requestedPartitionColIds, answer) => - val requestedDataColIds = Array(0, 1, 0, 0) - val reader = getReader(requestedDataColIds, requestedPartitionColIds) - assert(reader.requestedDataColIds === answer) + def getReader( + requestedDataColIds: Array[Int], + requestedPartitionColIds: Array[Int], + resultFields: Array[StructField]): OrcColumnarBatchReader = { + val reader = new OrcColumnarBatchReader(false, false, 4096) + reader.initBatch( + orcFileSchema, + resultFields, + requestedDataColIds, + requestedPartitionColIds, + partitionValues) + reader } - } - - test("initBatch should initialize requested partition columns only") { - val requestedDataColIds = Array(0, -1, -1, -1) // only `col1` is requested, `col3` doesn't exist - val requestedPartitionColIds = Array(-1, -1, 0, -1) // only `partCol1` is requested - val reader = getReader(requestedDataColIds, requestedPartitionColIds) - val batch = reader.columnarBatch - assert(batch.numCols() === 4) - assert(batch.column(0).isInstanceOf[OrcColumnVector]) - assert(batch.column(1).isInstanceOf[OnHeapColumnVector]) - assert(batch.column(2).isInstanceOf[OnHeapColumnVector]) - assert(batch.column(3).isInstanceOf[OnHeapColumnVector]) + test(s"all partitions are requested: $schema") { + val requestedDataColIds = Array(0, 1, 0, 0) + val requestedPartitionColIds = Array(-1, -1, 0, 1) + val reader = getReader(requestedDataColIds, requestedPartitionColIds, + dataSchema.fields ++ partitionSchema.fields) + assert(reader.requestedDataColIds === Array(0, 1, -1, -1)) + } - val col3 = batch.column(1).asInstanceOf[OnHeapColumnVector] - val partCol1 = batch.column(2).asInstanceOf[OnHeapColumnVector] - val partCol2 = batch.column(3).asInstanceOf[OnHeapColumnVector] + test(s"initBatch should initialize requested partition columns only: $schema") { + val requestedDataColIds = Array(0, -1) // only `col1` is requested, `col2` doesn't exist + val requestedPartitionColIds = Array(-1, 0) // only `p1` is requested + val reader = getReader(requestedDataColIds, requestedPartitionColIds, + Array(dataSchema.fields(0), partitionSchema.fields(0))) + val batch = reader.columnarBatch + assert(batch.numCols() === 2) - assert(isConstant.get(col3).asInstanceOf[Boolean]) // `col3` is NULL. - assert(isConstant.get(partCol1).asInstanceOf[Boolean]) // Partition column is constant. - assert(isConstant.get(partCol2).asInstanceOf[Boolean]) // Null column is constant. + assert(batch.column(0).isInstanceOf[OrcColumnVector]) + assert(batch.column(1).isInstanceOf[OnHeapColumnVector]) - assert(partCol1.getUTF8String(0) === partitionValues.getUTF8String(0)) - assert(partCol2.isNullAt(0)) // This is NULL because it's not requested. + val p1 = batch.column(1).asInstanceOf[OnHeapColumnVector] + assert(isConstant.get(p1).asInstanceOf[Boolean]) // Partition column is constant. + assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0)) + } } } From 1b58df82c6ac5580f14a3b68022b88915c577646 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 28 Dec 2018 19:41:00 +0800 Subject: [PATCH 5/6] add method PartitioningUtils.requestedPartitionColumnIds --- .../datasources/PartitioningUtils.scala | 19 +++++++++++++++++++ .../datasources/orc/OrcFileFormat.scala | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 6458b65466fb..85aa8a15d2dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -539,6 +539,25 @@ object PartitioningUtils { }).asNullable } + def requestedPartitionColumnIds( + requiredSchema: StructType, + partitionSchema: StructType, + caseSensitive: Boolean): Array[Int] = { + val columnNameMap = + partitionSchema.fields.map(getColName(_, caseSensitive)).zipWithIndex.toMap + requiredSchema.fields.map { field => + columnNameMap.getOrElse(getColName(field, caseSensitive), -1) + } + } + + private def getColName(f: StructField, caseSensitive: Boolean): String = { + if (caseSensitive) { + f.name + } else { + f.name.toLowerCase(Locale.ROOT) + } + } + private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { if (caseSensitive) { org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index cd10ad21cd82..9b9c32fecca8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -207,8 +207,8 @@ class OrcFileFormat val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) - val requestedPartitionColIds = - Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) + val requestedPartitionColIds = PartitioningUtils.requestedPartitionColumnIds( + resultSchema, partitionSchema, isCaseSensitive) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( reader.getSchema, From 5ed34d8621cbd644df8b9cc0e8d7b5b27aa3a2ad Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 1 Jan 2019 17:28:09 +0800 Subject: [PATCH 6/6] Revert "add method PartitioningUtils.requestedPartitionColumnIds" This reverts commit 1b58df82c6ac5580f14a3b68022b88915c577646. --- .../datasources/PartitioningUtils.scala | 19 ------------------- .../datasources/orc/OrcFileFormat.scala | 4 ++-- 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 85aa8a15d2dc..6458b65466fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -539,25 +539,6 @@ object PartitioningUtils { }).asNullable } - def requestedPartitionColumnIds( - requiredSchema: StructType, - partitionSchema: StructType, - caseSensitive: Boolean): Array[Int] = { - val columnNameMap = - partitionSchema.fields.map(getColName(_, caseSensitive)).zipWithIndex.toMap - requiredSchema.fields.map { field => - columnNameMap.getOrElse(getColName(field, caseSensitive), -1) - } - } - - private def getColName(f: StructField, caseSensitive: Boolean): String = { - if (caseSensitive) { - f.name - } else { - f.name.toLowerCase(Locale.ROOT) - } - } - private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { if (caseSensitive) { org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 9b9c32fecca8..cd10ad21cd82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -207,8 +207,8 @@ class OrcFileFormat val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) - val requestedPartitionColIds = PartitioningUtils.requestedPartitionColumnIds( - resultSchema, partitionSchema, isCaseSensitive) + val requestedPartitionColIds = + Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( reader.getSchema,