diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index a0d9578a377b..7dc90df05a8f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.stream.IntStream; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -58,9 +59,14 @@ public class OrcColumnarBatchReader extends RecordReader { /** * The column IDs of the physical ORC file schema which are required by this reader. - * -1 means this required column doesn't exist in the ORC file. + * -1 means this required column is partition column, or it doesn't exist in the ORC file. + * Ideally partition column should never appear in the physical file, and should only appear + * in the directory name. However, Spark allows partition columns inside physical file, + * but Spark will discard the values from the file, and use the partition value got from + * directory name. The column order will be reserved though. */ - private int[] requestedColIds; + @VisibleForTesting + public int[] requestedDataColIds; // Record reader from ORC row batch. private org.apache.orc.RecordReader recordReader; @@ -68,7 +74,8 @@ public class OrcColumnarBatchReader extends RecordReader { private StructField[] requiredFields; // The result columnar batch for vectorized execution by whole-stage codegen. - private ColumnarBatch columnarBatch; + @VisibleForTesting + public ColumnarBatch columnarBatch; // Writable column vectors of the result columnar batch. private WritableColumnVector[] columnVectors; @@ -143,25 +150,33 @@ public void initialize( /** * Initialize columnar batch by setting required schema and partition information. * With this information, this creates ColumnarBatch with the full schema. + * + * @param orcSchema Schema from ORC file reader. + * @param requiredFields All the fields that are required to return, including partition fields. + * @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed. + * @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed. + * @param partitionValues Values of partition columns. */ public void initBatch( TypeDescription orcSchema, - int[] requestedColIds, StructField[] requiredFields, - StructType partitionSchema, + int[] requestedDataColIds, + int[] requestedPartitionColIds, InternalRow partitionValues) { batch = orcSchema.createRowBatch(capacity); assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. - + assert(requiredFields.length == requestedDataColIds.length); + assert(requiredFields.length == requestedPartitionColIds.length); + // If a required column is also partition column, use partition value and don't read from file. + for (int i = 0; i < requiredFields.length; i++) { + if (requestedPartitionColIds[i] != -1) { + requestedDataColIds[i] = -1; + } + } this.requiredFields = requiredFields; - this.requestedColIds = requestedColIds; - assert(requiredFields.length == requestedColIds.length); + this.requestedDataColIds = requestedDataColIds; StructType resultSchema = new StructType(requiredFields); - for (StructField f : partitionSchema.fields()) { - resultSchema = resultSchema.add(f); - } - if (copyToSpark) { if (MEMORY_MODE == MemoryMode.OFF_HEAP) { columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema); @@ -169,22 +184,18 @@ public void initBatch( columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema); } - // Initialize the missing columns once. + // Initialize the partition columns and missing columns once. for (int i = 0; i < requiredFields.length; i++) { - if (requestedColIds[i] == -1) { + if (requestedPartitionColIds[i] != -1) { + ColumnVectorUtils.populate(columnVectors[i], + partitionValues, requestedPartitionColIds[i]); + columnVectors[i].setIsConstant(); + } else if (requestedDataColIds[i] == -1) { columnVectors[i].putNulls(0, capacity); columnVectors[i].setIsConstant(); } } - if (partitionValues.numFields() > 0) { - int partitionIdx = requiredFields.length; - for (int i = 0; i < partitionValues.numFields(); i++) { - ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i); - columnVectors[i + partitionIdx].setIsConstant(); - } - } - columnarBatch = new ColumnarBatch(columnVectors); } else { // Just wrap the ORC column vector instead of copying it to Spark column vector. @@ -192,26 +203,22 @@ public void initBatch( for (int i = 0; i < requiredFields.length; i++) { DataType dt = requiredFields[i].dataType(); - int colId = requestedColIds[i]; - // Initialize the missing columns once. - if (colId == -1) { - OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); - missingCol.putNulls(0, capacity); - missingCol.setIsConstant(); - orcVectorWrappers[i] = missingCol; - } else { - orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); - } - } - - if (partitionValues.numFields() > 0) { - int partitionIdx = requiredFields.length; - for (int i = 0; i < partitionValues.numFields(); i++) { - DataType dt = partitionSchema.fields()[i].dataType(); + if (requestedPartitionColIds[i] != -1) { OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt); - ColumnVectorUtils.populate(partitionCol, partitionValues, i); + ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]); partitionCol.setIsConstant(); - orcVectorWrappers[partitionIdx + i] = partitionCol; + orcVectorWrappers[i] = partitionCol; + } else { + int colId = requestedDataColIds[i]; + // Initialize the missing columns once. + if (colId == -1) { + OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); + missingCol.putNulls(0, capacity); + missingCol.setIsConstant(); + orcVectorWrappers[i] = missingCol; + } else { + orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); + } } } @@ -233,7 +240,7 @@ private boolean nextBatch() throws IOException { if (!copyToSpark) { for (int i = 0; i < requiredFields.length; i++) { - if (requestedColIds[i] != -1) { + if (requestedDataColIds[i] != -1) { ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize); } } @@ -248,8 +255,8 @@ private boolean nextBatch() throws IOException { StructField field = requiredFields[i]; WritableColumnVector toColumn = columnVectors[i]; - if (requestedColIds[i] >= 0) { - ColumnVector fromColumn = batch.cols[requestedColIds[i]]; + if (requestedDataColIds[i] >= 0) { + ColumnVector fromColumn = batch.cols[requestedDataColIds[i]]; if (fromColumn.isRepeating) { putRepeatingValues(batchSize, field, fromColumn, toColumn); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 4574f8247af5..cd10ad21cd82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -206,13 +206,15 @@ class OrcFileFormat // after opening a file. val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - + val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) + val requestedPartitionColIds = + Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( reader.getSchema, - requestedColIds, - requiredSchema.fields, - partitionSchema, + resultSchema.fields, + requestedDataColIds, + requestedPartitionColIds, file.partitionValues) iter.asInstanceOf[Iterator[InternalRow]] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala new file mode 100644 index 000000000000..52abeb20e7f2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.orc.TypeDescription + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String.fromString + +class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext { + private val dataSchema = StructType.fromDDL("col1 int, col2 int") + private val partitionSchema = StructType.fromDDL("p1 string, p2 string") + private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2")) + private val orcFileSchemaList = Seq( + "struct", "struct", + "struct", "struct") + orcFileSchemaList.foreach { case schema => + val orcFileSchema = TypeDescription.fromString(schema) + + val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant") + isConstant.setAccessible(true) + + def getReader( + requestedDataColIds: Array[Int], + requestedPartitionColIds: Array[Int], + resultFields: Array[StructField]): OrcColumnarBatchReader = { + val reader = new OrcColumnarBatchReader(false, false, 4096) + reader.initBatch( + orcFileSchema, + resultFields, + requestedDataColIds, + requestedPartitionColIds, + partitionValues) + reader + } + + test(s"all partitions are requested: $schema") { + val requestedDataColIds = Array(0, 1, 0, 0) + val requestedPartitionColIds = Array(-1, -1, 0, 1) + val reader = getReader(requestedDataColIds, requestedPartitionColIds, + dataSchema.fields ++ partitionSchema.fields) + assert(reader.requestedDataColIds === Array(0, 1, -1, -1)) + } + + test(s"initBatch should initialize requested partition columns only: $schema") { + val requestedDataColIds = Array(0, -1) // only `col1` is requested, `col2` doesn't exist + val requestedPartitionColIds = Array(-1, 0) // only `p1` is requested + val reader = getReader(requestedDataColIds, requestedPartitionColIds, + Array(dataSchema.fields(0), partitionSchema.fields(0))) + val batch = reader.columnarBatch + assert(batch.numCols() === 2) + + assert(batch.column(0).isInstanceOf[OrcColumnVector]) + assert(batch.column(1).isInstanceOf[OnHeapColumnVector]) + + val p1 = batch.column(1).asInstanceOf[OnHeapColumnVector] + assert(isConstant.get(p1).asInstanceOf[Boolean]) // Partition column is constant. + assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0)) + } + } +}