Skip to content

Commit dec9aa3

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-20961][SQL] generalize the dictionary in ColumnVector
## What changes were proposed in this pull request? As the first step of https://issues.apache.org/jira/browse/SPARK-20960 , to make `ColumnVector` public, this PR generalize `ColumnVector.dictionary` to not couple with parquet. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #18183 from cloud-fan/dictionary.
1 parent c70c38e commit dec9aa3

File tree

5 files changed

+100
-21
lines changed

5 files changed

+100
-21
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet;
19+
20+
import org.apache.spark.sql.execution.vectorized.Dictionary;
21+
22+
public final class ParquetDictionary implements Dictionary {
23+
private org.apache.parquet.column.Dictionary dictionary;
24+
25+
public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) {
26+
this.dictionary = dictionary;
27+
}
28+
29+
@Override
30+
public int decodeToInt(int id) {
31+
return dictionary.decodeToInt(id);
32+
}
33+
34+
@Override
35+
public long decodeToLong(int id) {
36+
return dictionary.decodeToLong(id);
37+
}
38+
39+
@Override
40+
public float decodeToFloat(int id) {
41+
return dictionary.decodeToFloat(id);
42+
}
43+
44+
@Override
45+
public double decodeToDouble(int id) {
46+
return dictionary.decodeToDouble(id);
47+
}
48+
49+
@Override
50+
public byte[] decodeToBinary(int id) {
51+
return dictionary.decodeToBinary(id).getBytes();
52+
}
53+
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ void readBatch(int total, ColumnVector column) throws IOException {
169169
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
170170
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
171171
// non-dictionary encoded values have already been added).
172-
column.setDictionary(dictionary);
172+
column.setDictionary(new ParquetDictionary(dictionary));
173173
} else {
174174
decodeDictionaryIds(rowId, num, column, dictionaryIds);
175175
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,6 @@ public float getProgress() throws IOException, InterruptedException {
154154
return (float) rowsReturned / totalRowCount;
155155
}
156156

157-
/**
158-
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
159-
* This object is reused. Calling this enables the vectorized reader. This should be called
160-
* before any calls to nextKeyValue/nextBatch.
161-
*/
162-
163157
// Creates a columnar batch that includes the schema from the data files and the additional
164158
// partition columns appended to the end of the batch.
165159
// For example, if the data contains two columns, with 2 partition columns:
@@ -204,12 +198,17 @@ public void initBatch(StructType partitionColumns, InternalRow partitionValues)
204198
initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
205199
}
206200

201+
/**
202+
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
203+
* This object is reused. Calling this enables the vectorized reader. This should be called
204+
* before any calls to nextKeyValue/nextBatch.
205+
*/
207206
public ColumnarBatch resultBatch() {
208207
if (columnarBatch == null) initBatch();
209208
return columnarBatch;
210209
}
211210

212-
/*
211+
/**
213212
* Can be called before any rows are returned to enable returning columnar batches directly.
214213
*/
215214
public void enableReturningBatches() {
@@ -237,9 +236,7 @@ public boolean nextBatch() throws IOException {
237236
}
238237

239238
private void initializeInternal() throws IOException, UnsupportedOperationException {
240-
/**
241-
* Check that the requested schema is supported.
242-
*/
239+
// Check that the requested schema is supported.
243240
missingColumns = new boolean[requestedSchema.getFieldCount()];
244241
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
245242
Type t = requestedSchema.getFields().get(i);

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.math.BigInteger;
2121

2222
import com.google.common.annotations.VisibleForTesting;
23-
import org.apache.parquet.column.Dictionary;
24-
import org.apache.parquet.io.api.Binary;
2523

2624
import org.apache.spark.memory.MemoryMode;
2725
import org.apache.spark.sql.catalyst.InternalRow;
@@ -313,8 +311,8 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
313311
}
314312

315313
/**
316-
* Ensures that there is enough storage to store capcity elements. That is, the put() APIs
317-
* must work for all rowIds < capcity.
314+
* Ensures that there is enough storage to store capacity elements. That is, the put() APIs
315+
* must work for all rowIds < capacity.
318316
*/
319317
protected abstract void reserveInternal(int capacity);
320318

@@ -479,7 +477,6 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
479477

480478
/**
481479
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
482-
* src should contain `count` doubles written as ieee format.
483480
*/
484481
public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);
485482

@@ -506,7 +503,6 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
506503

507504
/**
508505
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
509-
* src should contain `count` doubles written as ieee format.
510506
*/
511507
public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
512508

@@ -628,8 +624,8 @@ public final UTF8String getUTF8String(int rowId) {
628624
ColumnVector.Array a = getByteArray(rowId);
629625
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
630626
} else {
631-
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
632-
return UTF8String.fromBytes(v.getBytes());
627+
byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
628+
return UTF8String.fromBytes(bytes);
633629
}
634630
}
635631

@@ -643,8 +639,7 @@ public final byte[] getBinary(int rowId) {
643639
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
644640
return bytes;
645641
} else {
646-
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
647-
return v.getBytes();
642+
return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
648643
}
649644
}
650645

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.vectorized;
19+
20+
/**
21+
* The interface for dictionary in ColumnVector to decode dictionary encoded values.
22+
*/
23+
public interface Dictionary {
24+
25+
int decodeToInt(int id);
26+
27+
long decodeToLong(int id);
28+
29+
float decodeToFloat(int id);
30+
31+
double decodeToDouble(int id);
32+
33+
byte[] decodeToBinary(int id);
34+
}

0 commit comments

Comments
 (0)