Skip to content

Commit 300ec1a

Browse files
wangyumsrowen
authored andcommitted
[SPARK-27226][SQL] Reduce the code duplicate when upgrading built-in Hive
## What changes were proposed in this pull request? This pr related to #24119. Reduce the code duplicate when upgrading built-in Hive. To achieve this, we should avoid using classes in `org.apache.orc.storage.*` because these classes will be replaced with `org.apache.hadoop.hive.*` after upgrading the built-in Hive. Such as: ![image](https://user-images.githubusercontent.com/5399861/54437594-e9be1000-476f-11e9-8878-3b7414871ee5.png) - Move the usage of `org.apache.orc.storage.*` to `OrcShimUtils`: 1. Add wrapper for `VectorizedRowBatch`(Reduce code duplication of [OrcColumnarBatchReader](https://github.com/apache/spark/pull/24166/files#diff-e594f7295e5408c01ace8175166313b6)). 2. Move some serializer/deserializer method out of `OrcDeserializer` and `OrcSerializer`(Reduce code duplication of [OrcDeserializer](https://github.com/apache/spark/pull/24166/files#diff-b933819e6dcaff41eee8fce1e8f2932c) and [OrcSerializer](https://github.com/apache/spark/pull/24166/files#diff-6d3849d88929f6ea25c436d71da729da)). 3. Defined two type aliases: `Operator` and `SearchArgument`(Reduce code duplication of [OrcV1FilterSuite](https://github.com/apache/spark/pull/24166/files#diff-48c4fc7a3b3384a6d0aab246723a0058)). - Move duplication code to super class: 1. Add a trait for `OrcFilters`(Reduce code duplication of [OrcFilters](https://github.com/apache/spark/pull/24166/files#diff-224b8cbedf286ecbfdd092d1e2e2f237)). 2. Move `checkNoFilterPredicate` from `OrcFilterSuite` to `OrcTest`(Reduce code duplication of [OrcFilterSuite](https://github.com/apache/spark/pull/24166/files#diff-8e05c1faaaec98edd7723e62f84066f1)). After this pr. We only need to copy these 4 files: OrcColumnVector, OrcFilters, OrcFilterSuite and OrcShimUtils. ## How was this patch tested? existing tests Closes #24166 from wangyum/SPARK-27226. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 0bc030c commit 300ec1a

File tree

9 files changed

+174
-89
lines changed

9 files changed

+174
-89
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.orc.Reader;
3131
import org.apache.orc.TypeDescription;
3232
import org.apache.orc.mapred.OrcInputFormat;
33-
import org.apache.orc.storage.ql.exec.vector.*;
3433

3534
import org.apache.spark.sql.catalyst.InternalRow;
35+
import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap;
3636
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
3737
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
3838
import org.apache.spark.sql.types.*;
@@ -48,8 +48,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
4848
// The capacity of vectorized batch.
4949
private int capacity;
5050

51-
// Vectorized ORC Row Batch
52-
private VectorizedRowBatch batch;
51+
// Vectorized ORC Row Batch wrap.
52+
private VectorizedRowBatchWrap wrap;
5353

5454
/**
5555
* The column IDs of the physical ORC file schema which are required by this reader.
@@ -146,8 +146,8 @@ public void initBatch(
146146
int[] requestedDataColIds,
147147
int[] requestedPartitionColIds,
148148
InternalRow partitionValues) {
149-
batch = orcSchema.createRowBatch(capacity);
150-
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
149+
wrap = new VectorizedRowBatchWrap(orcSchema.createRowBatch(capacity));
150+
assert(!wrap.batch().selectedInUse); // `selectedInUse` should be initialized with `false`.
151151
assert(requiredFields.length == requestedDataColIds.length);
152152
assert(requiredFields.length == requestedPartitionColIds.length);
153153
// If a required column is also partition column, use partition value and don't read from file.
@@ -180,7 +180,7 @@ public void initBatch(
180180
missingCol.setIsConstant();
181181
orcVectorWrappers[i] = missingCol;
182182
} else {
183-
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
183+
orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]);
184184
}
185185
}
186186
}
@@ -193,8 +193,8 @@ public void initBatch(
193193
* by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
194194
*/
195195
private boolean nextBatch() throws IOException {
196-
recordReader.nextBatch(batch);
197-
int batchSize = batch.size;
196+
recordReader.nextBatch(wrap.batch());
197+
int batchSize = wrap.batch().size;
198198
if (batchSize == 0) {
199199
return false;
200200
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.orc
1919

2020
import org.apache.hadoop.io._
2121
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
22-
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
2322

2423
import org.apache.spark.sql.catalyst.InternalRow
2524
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
@@ -109,14 +108,13 @@ class OrcDeserializer(
109108
updater.set(ordinal, bytes)
110109

111110
case DateType => (ordinal, value) =>
112-
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get))
111+
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(OrcShimUtils.getSqlDate(value)))
113112

114113
case TimestampType => (ordinal, value) =>
115114
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
116115

117116
case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
118-
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
119-
val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
117+
val v = OrcShimUtils.getDecimal(value)
120118
v.changePrecision(precision, scale)
121119
updater.set(ordinal, v)
122120

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
2323
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
2424
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
2525

26-
import org.apache.spark.sql.sources.{And, Filter}
26+
import org.apache.spark.sql.sources.Filter
2727
import org.apache.spark.sql.types._
2828

2929
/**
@@ -56,27 +56,7 @@ import org.apache.spark.sql.types._
5656
* builder methods mentioned above can only be found in test code, where all tested filters are
5757
* known to be convertible.
5858
*/
59-
private[sql] object OrcFilters {
60-
private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
61-
filters match {
62-
case Seq() => None
63-
case Seq(filter) => Some(filter)
64-
case Seq(filter1, filter2) => Some(And(filter1, filter2))
65-
case _ => // length > 2
66-
val (left, right) = filters.splitAt(filters.length / 2)
67-
Some(And(buildTree(left).get, buildTree(right).get))
68-
}
69-
}
70-
71-
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
72-
// in order to distinguish predicate pushdown for nested columns.
73-
private def quoteAttributeNameIfNeeded(name: String) : String = {
74-
if (!name.contains("`") && name.contains(".")) {
75-
s"`$name`"
76-
} else {
77-
name
78-
}
79-
}
59+
private[sql] object OrcFilters extends OrcFiltersBase {
8060

8161
/**
8262
* Create ORC filter as a SearchArgument instance.
@@ -101,16 +81,6 @@ private[sql] object OrcFilters {
10181
} yield filter
10282
}
10383

104-
/**
105-
* Return true if this is a searchable type in ORC.
106-
* Both CharType and VarcharType are cleaned at AstBuilder.
107-
*/
108-
private def isSearchableType(dataType: DataType) = dataType match {
109-
case BinaryType => false
110-
case _: AtomicType => true
111-
case _ => false
112-
}
113-
11484
/**
11585
* Get PredicateLeafType which is corresponding to the given DataType.
11686
*/
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc
19+
20+
import org.apache.spark.sql.sources.{And, Filter}
21+
import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType}
22+
23+
/**
24+
* Methods that can be shared when upgrading the built-in Hive.
25+
*/
26+
trait OrcFiltersBase {
27+
28+
private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
29+
filters match {
30+
case Seq() => None
31+
case Seq(filter) => Some(filter)
32+
case Seq(filter1, filter2) => Some(And(filter1, filter2))
33+
case _ => // length > 2
34+
val (left, right) = filters.splitAt(filters.length / 2)
35+
Some(And(buildTree(left).get, buildTree(right).get))
36+
}
37+
}
38+
39+
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
40+
// in order to distinguish predicate pushdown for nested columns.
41+
protected def quoteAttributeNameIfNeeded(name: String) : String = {
42+
if (!name.contains("`") && name.contains(".")) {
43+
s"`$name`"
44+
} else {
45+
name
46+
}
47+
}
48+
49+
/**
50+
* Return true if this is a searchable type in ORC.
51+
* Both CharType and VarcharType are cleaned at AstBuilder.
52+
*/
53+
protected def isSearchableType(dataType: DataType) = dataType match {
54+
case BinaryType => false
55+
case _: AtomicType => true
56+
case _ => false
57+
}
58+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.orc
2020
import org.apache.hadoop.io._
2121
import org.apache.orc.TypeDescription
2222
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
23-
import org.apache.orc.storage.common.`type`.HiveDecimal
24-
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
2523

2624
import org.apache.spark.sql.catalyst.InternalRow
2725
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
@@ -139,14 +137,7 @@ class OrcSerializer(dataSchema: StructType) {
139137
new BytesWritable(getter.getBinary(ordinal))
140138

141139
case DateType =>
142-
if (reuseObj) {
143-
val result = new DateWritable()
144-
(getter, ordinal) =>
145-
result.set(getter.getInt(ordinal))
146-
result
147-
} else {
148-
(getter, ordinal) => new DateWritable(getter.getInt(ordinal))
149-
}
140+
OrcShimUtils.getDateWritable(reuseObj)
150141

151142
// The following cases are already expensive, reusing object or not doesn't matter.
152143

@@ -156,9 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
156147
result.setNanos(ts.getNanos)
157148
result
158149

159-
case DecimalType.Fixed(precision, scale) => (getter, ordinal) =>
160-
val d = getter.getDecimal(ordinal, precision, scale)
161-
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
150+
case DecimalType.Fixed(precision, scale) =>
151+
OrcShimUtils.getHiveDecimalWritable(precision, scale)
162152

163153
case st: StructType => (getter, ordinal) =>
164154
val result = createOrcValue(st).asInstanceOf[OrcStruct]
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc
19+
20+
import java.sql.Date
21+
22+
import org.apache.orc.storage.common.`type`.HiveDecimal
23+
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
24+
import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
25+
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
26+
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
27+
28+
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
29+
import org.apache.spark.sql.types.Decimal
30+
31+
/**
32+
* Various utilities for ORC used to upgrade the built-in Hive.
33+
*/
34+
private[sql] object OrcShimUtils {
35+
36+
class VectorizedRowBatchWrap(val batch: VectorizedRowBatch) {}
37+
38+
private[sql] type Operator = OrcOperator
39+
private[sql] type SearchArgument = OrcSearchArgument
40+
41+
def getSqlDate(value: Any): Date = value.asInstanceOf[DateWritable].get
42+
43+
def getDecimal(value: Any): Decimal = {
44+
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
45+
Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
46+
}
47+
48+
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
49+
if (reuseObj) {
50+
val result = new DateWritable()
51+
(getter, ordinal) =>
52+
result.set(getter.getInt(ordinal))
53+
result
54+
} else {
55+
(getter: SpecializedGetters, ordinal: Int) =>
56+
new DateWritable(getter.getInt(ordinal))
57+
}
58+
}
59+
60+
def getHiveDecimalWritable(precision: Int, scale: Int):
61+
(SpecializedGetters, Int) => HiveDecimalWritable = {
62+
(getter, ordinal) =>
63+
val d = getter.getDecimal(ordinal, precision, scale)
64+
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
65+
}
66+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -89,34 +89,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
8989
checkFilterPredicate(df, predicate, checkLogicalOperator)
9090
}
9191

92-
protected def checkNoFilterPredicate
93-
(predicate: Predicate, noneSupported: Boolean = false)
94-
(implicit df: DataFrame): Unit = {
95-
val output = predicate.collect { case a: Attribute => a }.distinct
96-
val query = df
97-
.select(output.map(e => Column(e)): _*)
98-
.where(Column(predicate))
99-
100-
query.queryExecution.optimizedPlan match {
101-
case PhysicalOperation(_, filters,
102-
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
103-
assert(filters.nonEmpty, "No filter is analyzed from the given query")
104-
val scanBuilder = orcTable.newScanBuilder(options)
105-
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
106-
val pushedFilters = scanBuilder.pushedFilters()
107-
if (noneSupported) {
108-
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
109-
} else {
110-
assert(pushedFilters.nonEmpty, "No filter is pushed down")
111-
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
112-
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
113-
}
114-
115-
case _ =>
116-
throw new AnalysisException("Can not match OrcTable in the query.")
117-
}
118-
}
119-
12092
test("filter pushdown - integer") {
12193
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
12294
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import scala.reflect.runtime.universe.TypeTag
2525
import org.scalatest.BeforeAndAfterAll
2626

2727
import org.apache.spark.sql._
28-
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
29+
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
30+
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest}
31+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
32+
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
2933
import org.apache.spark.sql.internal.SQLConf
3034
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
3135

@@ -104,4 +108,32 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
104108
assert(actual < numRows)
105109
}
106110
}
111+
112+
protected def checkNoFilterPredicate
113+
(predicate: Predicate, noneSupported: Boolean = false)
114+
(implicit df: DataFrame): Unit = {
115+
val output = predicate.collect { case a: Attribute => a }.distinct
116+
val query = df
117+
.select(output.map(e => Column(e)): _*)
118+
.where(Column(predicate))
119+
120+
query.queryExecution.optimizedPlan match {
121+
case PhysicalOperation(_, filters,
122+
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
123+
assert(filters.nonEmpty, "No filter is analyzed from the given query")
124+
val scanBuilder = orcTable.newScanBuilder(options)
125+
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
126+
val pushedFilters = scanBuilder.pushedFilters()
127+
if (noneSupported) {
128+
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
129+
} else {
130+
assert(pushedFilters.nonEmpty, "No filter is pushed down")
131+
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
132+
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
133+
}
134+
135+
case _ =>
136+
throw new AnalysisException("Can not match OrcTable in the query.")
137+
}
138+
}
107139
}

0 commit comments

Comments
 (0)