Skip to content

Commit bdf9dbf

Browse files
committed
add a wrapper solution for vectorized orc reader
1 parent 2250cb7 commit bdf9dbf

File tree

5 files changed

+470
-113
lines changed

5 files changed

+470
-113
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,13 @@ object SQLConf {
391391
.booleanConf
392392
.createWithDefault(true)
393393

394+
val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
395+
.doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the " +
396+
"vectorized ORC reader.")
397+
.internal()
398+
.booleanConf
399+
.createWithDefault(false)
400+
394401
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
395402
.doc("When true, enable filter pushdown for ORC files.")
396403
.booleanConf
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc;
19+
20+
import java.math.BigDecimal;
21+
22+
import org.apache.orc.storage.ql.exec.vector.*;
23+
24+
import org.apache.spark.sql.types.DataType;
25+
import org.apache.spark.sql.types.Decimal;
26+
import org.apache.spark.unsafe.types.UTF8String;
27+
28+
/**
29+
* A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts
30+
* Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with
31+
* Spark ColumnarVector.
32+
*/
33+
public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
34+
private ColumnVector baseData;
35+
private LongColumnVector longData;
36+
private DoubleColumnVector doubleData;
37+
private BytesColumnVector bytesData;
38+
private DecimalColumnVector decimalData;
39+
40+
private int batchSize;
41+
42+
public OrcColumnVector(DataType type, ColumnVector vector) {
43+
super(type);
44+
45+
baseData = vector;
46+
if (vector instanceof LongColumnVector) {
47+
longData = (LongColumnVector) vector;
48+
} else if (vector instanceof DoubleColumnVector) {
49+
doubleData = (DoubleColumnVector) vector;
50+
} else if (vector instanceof BytesColumnVector) {
51+
bytesData = (BytesColumnVector) vector;
52+
} else if (vector instanceof DecimalColumnVector) {
53+
decimalData = (DecimalColumnVector) vector;
54+
} else {
55+
throw new UnsupportedOperationException();
56+
}
57+
}
58+
59+
public void setBatchSize(int batchSize) {
60+
this.batchSize = batchSize;
61+
}
62+
63+
@Override
64+
public void close() {
65+
66+
}
67+
68+
@Override
69+
public int numNulls() {
70+
if (baseData.isRepeating) {
71+
if (baseData.isNull[0]) {
72+
return batchSize;
73+
} else {
74+
return 0;
75+
}
76+
} else if (baseData.noNulls) {
77+
return 0;
78+
} else {
79+
int count = 0;
80+
for (int i = 0; i < batchSize; i++) {
81+
if (baseData.isNull[i]) count++;
82+
}
83+
return count;
84+
}
85+
}
86+
87+
/* A helper method to get the row index in a column. */
88+
private int getRowIndex(int rowId) {
89+
return baseData.isRepeating ? 0 : rowId;
90+
}
91+
92+
@Override
93+
public boolean isNullAt(int rowId) {
94+
return baseData.isNull[getRowIndex(rowId)];
95+
}
96+
97+
@Override
98+
public boolean getBoolean(int rowId) {
99+
return longData.vector[getRowIndex(rowId)] == 1;
100+
}
101+
102+
@Override
103+
public boolean[] getBooleans(int rowId, int count) {
104+
boolean[] res = new boolean[count];
105+
for (int i = 0; i < count; i++) {
106+
res[i] = getBoolean(rowId + i);
107+
}
108+
return res;
109+
}
110+
111+
@Override
112+
public byte getByte(int rowId) {
113+
return (byte) longData.vector[getRowIndex(rowId)];
114+
}
115+
116+
@Override
117+
public byte[] getBytes(int rowId, int count) {
118+
byte[] res = new byte[count];
119+
for (int i = 0; i < count; i++) {
120+
res[i] = getByte(rowId + i);
121+
}
122+
return res;
123+
}
124+
125+
@Override
126+
public short getShort(int rowId) {
127+
return (short) longData.vector[getRowIndex(rowId)];
128+
}
129+
130+
@Override
131+
public short[] getShorts(int rowId, int count) {
132+
short[] res = new short[count];
133+
for (int i = 0; i < count; i++) {
134+
res[i] = getShort(rowId + i);
135+
}
136+
return res;
137+
}
138+
139+
@Override
140+
public int getInt(int rowId) {
141+
return (int) longData.vector[getRowIndex(rowId)];
142+
}
143+
144+
@Override
145+
public int[] getInts(int rowId, int count) {
146+
int[] res = new int[count];
147+
for (int i = 0; i < count; i++) {
148+
res[i] = getInt(rowId + i);
149+
}
150+
return res;
151+
}
152+
153+
@Override
154+
public long getLong(int rowId) {
155+
return longData.vector[getRowIndex(rowId)];
156+
}
157+
158+
@Override
159+
public long[] getLongs(int rowId, int count) {
160+
long[] res = new long[count];
161+
for (int i = 0; i < count; i++) {
162+
res[i] = getLong(rowId + i);
163+
}
164+
return res;
165+
}
166+
167+
@Override
168+
public float getFloat(int rowId) {
169+
return (float) doubleData.vector[getRowIndex(rowId)];
170+
}
171+
172+
@Override
173+
public float[] getFloats(int rowId, int count) {
174+
float[] res = new float[count];
175+
for (int i = 0; i < count; i++) {
176+
res[i] = getFloat(rowId + i);
177+
}
178+
return res;
179+
}
180+
181+
@Override
182+
public double getDouble(int rowId) {
183+
return doubleData.vector[getRowIndex(rowId)];
184+
}
185+
186+
@Override
187+
public double[] getDoubles(int rowId, int count) {
188+
double[] res = new double[count];
189+
for (int i = 0; i < count; i++) {
190+
res[i] = getDouble(rowId + i);
191+
}
192+
return res;
193+
}
194+
195+
@Override
196+
public int getArrayLength(int rowId) {
197+
throw new UnsupportedOperationException();
198+
}
199+
200+
@Override
201+
public int getArrayOffset(int rowId) {
202+
throw new UnsupportedOperationException();
203+
}
204+
205+
@Override
206+
public Decimal getDecimal(int rowId, int precision, int scale) {
207+
BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
208+
return Decimal.apply(data, precision, scale);
209+
}
210+
211+
@Override
212+
public UTF8String getUTF8String(int rowId) {
213+
int index = getRowIndex(rowId);
214+
BytesColumnVector col = bytesData;
215+
return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]);
216+
}
217+
218+
@Override
219+
public byte[] getBinary(int rowId) {
220+
int index = getRowIndex(rowId);
221+
byte[] binary = new byte[bytesData.length[index]];
222+
System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length);
223+
return binary;
224+
}
225+
226+
@Override
227+
public org.apache.spark.sql.vectorized.ColumnVector arrayData() {
228+
throw new UnsupportedOperationException();
229+
}
230+
231+
@Override
232+
public org.apache.spark.sql.vectorized.ColumnVector getChildColumn(int ordinal) {
233+
throw new UnsupportedOperationException();
234+
}
235+
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java

Lines changed: 73 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,18 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
8282
// Writable column vectors of the result columnar batch.
8383
private WritableColumnVector[] columnVectors;
8484

85-
/**
86-
* The memory mode of the columnarBatch
87-
*/
85+
// The wrapped ORC column vectors. It should be null if `copyToSpark` is true.
86+
private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
87+
88+
// The memory mode of the columnarBatch
8889
private final MemoryMode MEMORY_MODE;
8990

90-
public OrcColumnarBatchReader(boolean useOffHeap) {
91+
// Whether or not to copy the ORC columnar batch to Spark columnar batch.
92+
private final boolean copyToSpark;
93+
94+
public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark) {
9195
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
96+
this.copyToSpark = copyToSpark;
9297
}
9398

9499

@@ -167,27 +172,60 @@ public void initBatch(
167172
}
168173

169174
int capacity = DEFAULT_SIZE;
170-
if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
171-
columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
172-
} else {
173-
columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
174-
}
175-
columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
176175

177-
if (partitionValues.numFields() > 0) {
178-
int partitionIdx = requiredFields.length;
179-
for (int i = 0; i < partitionValues.numFields(); i++) {
180-
ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
181-
columnVectors[i + partitionIdx].setIsConstant();
176+
if (copyToSpark) {
177+
if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
178+
columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
179+
} else {
180+
columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
182181
}
183-
}
184182

185-
// Initialize the missing columns once.
186-
for (int i = 0; i < requiredFields.length; i++) {
187-
if (requestedColIds[i] == -1) {
188-
columnVectors[i].putNulls(0, columnarBatch.capacity());
189-
columnVectors[i].setIsConstant();
183+
// Initialize the missing columns once.
184+
for (int i = 0; i < requiredFields.length; i++) {
185+
if (requestedColIds[i] == -1) {
186+
columnVectors[i].putNulls(0, capacity);
187+
columnVectors[i].setIsConstant();
188+
}
189+
}
190+
191+
if (partitionValues.numFields() > 0) {
192+
int partitionIdx = requiredFields.length;
193+
for (int i = 0; i < partitionValues.numFields(); i++) {
194+
ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
195+
columnVectors[i + partitionIdx].setIsConstant();
196+
}
197+
}
198+
199+
columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
200+
} else {
201+
// Just wrap the ORC column vector instead of copying it to Spark column vector.
202+
orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
203+
204+
for (int i = 0; i < requiredFields.length; i++) {
205+
DataType dt = requiredFields[i].dataType();
206+
// Initialize the missing columns once.
207+
if (requestedColIds[i] == -1) {
208+
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
209+
missingCol.putNulls(0, capacity);
210+
missingCol.setIsConstant();
211+
orcVectorWrappers[i] = missingCol;
212+
} else {
213+
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[i]);
214+
}
190215
}
216+
217+
if (partitionValues.numFields() > 0) {
218+
int partitionIdx = requiredFields.length;
219+
for (int i = 0; i < partitionValues.numFields(); i++) {
220+
DataType dt = partitionSchema.fields()[i].dataType();
221+
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
222+
ColumnVectorUtils.populate(partitionCol, partitionValues, i);
223+
partitionCol.setIsConstant();
224+
orcVectorWrappers[partitionIdx + i] = partitionCol;
225+
}
226+
}
227+
228+
columnarBatch = new ColumnarBatch(resultSchema, orcVectorWrappers, capacity);
191229
}
192230
}
193231

@@ -196,17 +234,26 @@ public void initBatch(
196234
* by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
197235
*/
198236
private boolean nextBatch() throws IOException {
199-
for (WritableColumnVector vector : columnVectors) {
200-
vector.reset();
201-
}
202-
columnarBatch.setNumRows(0);
203-
204237
recordReader.nextBatch(batch);
205238
int batchSize = batch.size;
206239
if (batchSize == 0) {
207240
return false;
208241
}
209242
columnarBatch.setNumRows(batchSize);
243+
244+
if (!copyToSpark) {
245+
for (int i = 0; i < requiredFields.length; i++) {
246+
if (requestedColIds[i] != -1) {
247+
((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
248+
}
249+
}
250+
return true;
251+
}
252+
253+
for (WritableColumnVector vector : columnVectors) {
254+
vector.reset();
255+
}
256+
210257
for (int i = 0; i < requiredFields.length; i++) {
211258
StructField field = requiredFields[i];
212259
WritableColumnVector toColumn = columnVectors[i];

0 commit comments

Comments
 (0)