Skip to content

Commit 3e6598e

Browse files
dtenedorgengliangwang
authored andcommitted
[SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
### What changes were proposed in this pull request? Support vectorized Parquet scans when the table schema has associated DEFAULT column values. Example: ``` create table t(i int) using parquet; insert into t values(42); alter table t add column s string default concat('abc', def'); select * from t; > 42, 'abcdef' ``` ### Why are the changes needed? This change makes it easier to build, query, and maintain tables backed by Parquet data. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. Closes #36672 from dtenedor/default-parquet-vectorized. Authored-by: Daniel Tenedorio <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent c63e37e commit 3e6598e

File tree

9 files changed

+97
-31
lines changed

9 files changed

+97
-31
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,14 @@ final class ParquetColumnVector {
5555
/** Reader for this column - only set if 'isPrimitive' is true */
5656
private VectorizedColumnReader columnReader;
5757

58-
ParquetColumnVector(
59-
ParquetColumn column,
60-
WritableColumnVector vector,
61-
int capacity,
62-
MemoryMode memoryMode,
63-
Set<ParquetColumn> missingColumns) {
64-
this(column, vector, capacity, memoryMode, missingColumns, true);
65-
}
66-
6758
ParquetColumnVector(
6859
ParquetColumn column,
6960
WritableColumnVector vector,
7061
int capacity,
7162
MemoryMode memoryMode,
7263
Set<ParquetColumn> missingColumns,
73-
boolean isTopLevel) {
64+
boolean isTopLevel,
65+
Object defaultValue) {
7466
DataType sparkType = column.sparkType();
7567
if (!sparkType.sameType(vector.dataType())) {
7668
throw new IllegalArgumentException("Spark type: " + sparkType +
@@ -83,8 +75,21 @@ final class ParquetColumnVector {
8375
this.isPrimitive = column.isPrimitive();
8476

8577
if (missingColumns.contains(column)) {
86-
vector.setAllNull();
87-
return;
78+
if (defaultValue == null) {
79+
vector.setAllNull();
80+
return;
81+
}
82+
// For Parquet tables whose columns have associated DEFAULT values, this reader must return
83+
// those values instead of NULL when the corresponding columns are not present in storage.
84+
// Here we write the 'defaultValue' to each element in the new WritableColumnVector using
85+
// the appendObjects method. This delegates to some specific append* method depending on the
86+
// type of 'defaultValue'; for example, if 'defaultValue' is a Float, then we call the
87+
// appendFloats method.
88+
if (!vector.appendObjects(capacity, defaultValue).isPresent()) {
89+
throw new IllegalArgumentException("Cannot assign default column value to result " +
90+
"column batch in vectorized Parquet reader because the data type is not supported: " +
91+
defaultValue);
92+
}
8893
}
8994

9095
if (isPrimitive) {
@@ -101,7 +106,7 @@ final class ParquetColumnVector {
101106

102107
for (int i = 0; i < column.children().size(); i++) {
103108
ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i),
104-
vector.getChild(i), capacity, memoryMode, missingColumns, false);
109+
vector.getChild(i), capacity, memoryMode, missingColumns, false, null);
105110
children.add(childCv);
106111

107112

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
7171
protected MessageType fileSchema;
7272
protected MessageType requestedSchema;
7373
protected StructType sparkSchema;
74+
protected StructType sparkRequestedSchema;
7475
// Keep track of the version of the parquet writer. An older version wrote
7576
// corrupt delta byte arrays, and the version check is needed to detect that.
7677
protected ParsedVersion writerVersion;
@@ -113,10 +114,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
113114
fileReader.setRequestedSchema(requestedSchema);
114115
String sparkRequestedSchemaString =
115116
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
116-
StructType sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
117+
this.sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
117118
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration);
118119
this.parquetColumn = converter.convertParquetColumn(requestedSchema,
119-
Option.apply(sparkRequestedSchema));
120+
Option.apply(this.sparkRequestedSchema));
120121
this.sparkSchema = (StructType) parquetColumn.sparkType();
121122
this.totalRowCount = fileReader.getFilteredRecordCount();
122123

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,12 @@ private void initBatch(
259259

260260
columnVectors = new ParquetColumnVector[sparkSchema.fields().length];
261261
for (int i = 0; i < columnVectors.length; i++) {
262+
Object defaultValue = null;
263+
if (sparkRequestedSchema != null) {
264+
defaultValue = sparkRequestedSchema.existenceDefaultValues()[i];
265+
}
262266
columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i),
263-
vectors[i], capacity, memMode, missingColumns);
267+
vectors[i], capacity, memMode, missingColumns, true, defaultValue);
264268
}
265269

266270
if (partitionColumns != null) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.math.BigDecimal;
2020
import java.math.BigInteger;
2121
import java.nio.ByteBuffer;
22+
import java.util.Optional;
2223

2324
import com.google.common.annotations.VisibleForTesting;
2425

@@ -690,6 +691,57 @@ public final int appendStruct(boolean isNull) {
690691
return elementsAppended;
691692
}
692693

694+
/**
695+
* Appends multiple copies of a Java Object to the vector using the corresponding append* method
696+
* above.
697+
* @param length: The number of instances to append
698+
* @param value value to append to the vector
699+
* @return the number of values appended if the value maps to one of the append* methods above,
700+
* or Optional.empty() otherwise.
701+
*/
702+
public Optional<Integer> appendObjects(int length, Object value) {
703+
if (value instanceof Boolean) {
704+
return Optional.of(appendBooleans(length, (Boolean) value));
705+
}
706+
if (value instanceof Byte) {
707+
return Optional.of(appendBytes(length, (Byte) value));
708+
}
709+
if (value instanceof Decimal) {
710+
Decimal decimal = (Decimal) value;
711+
long unscaled = decimal.toUnscaledLong();
712+
if (decimal.precision() < 10) {
713+
return Optional.of(appendInts(length, (int) unscaled));
714+
} else {
715+
return Optional.of(appendLongs(length, unscaled));
716+
}
717+
}
718+
if (value instanceof Double) {
719+
return Optional.of(appendDoubles(length, (Double) value));
720+
}
721+
if (value instanceof Float) {
722+
return Optional.of(appendFloats(length, (Float) value));
723+
}
724+
if (value instanceof Integer) {
725+
return Optional.of(appendInts(length, (Integer) value));
726+
}
727+
if (value instanceof Long) {
728+
return Optional.of(appendLongs(length, (Long) value));
729+
}
730+
if (value instanceof Short) {
731+
return Optional.of(appendShorts(length, (Short) value));
732+
}
733+
if (value instanceof UTF8String) {
734+
UTF8String utf8 = (UTF8String) value;
735+
byte[] bytes = utf8.getBytes();
736+
int result = 0;
737+
for (int i = 0; i < length; ++i) {
738+
result += appendByteArray(bytes, 0, bytes.length);
739+
}
740+
return Optional.of(result);
741+
}
742+
return Optional.empty();
743+
}
744+
693745
// `WritableColumnVector` puts the data of array in the first child column vector, and puts the
694746
// array offsets and lengths in the current column vector.
695747
@Override

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
3838
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
3939
import org.apache.spark.sql.catalyst.rules.Rule
4040
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
41-
import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder
41+
import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder}
4242
import org.apache.spark.sql.connector.catalog.SupportsRead
4343
import org.apache.spark.sql.connector.catalog.TableCapability._
4444
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue}
@@ -61,7 +61,7 @@ import org.apache.spark.unsafe.types.UTF8String
6161
* Note that, this rule must be run after `PreprocessTableCreation` and
6262
* `PreprocessTableInsertion`.
6363
*/
64-
object DataSourceAnalysis extends Rule[LogicalPlan] {
64+
case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
6565

6666
def resolver: Resolver = conf.resolver
6767

@@ -147,7 +147,11 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
147147

148148
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
149149
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
150-
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
150+
val newTableDesc: CatalogTable =
151+
tableDesc.copy(schema =
152+
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
153+
analyzer, tableDesc.schema, "CREATE TABLE"))
154+
CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore)
151155

152156
case CreateTable(tableDesc, mode, Some(query))
153157
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ abstract class BaseSessionStateBuilder(
195195
DetectAmbiguousSelfJoin +:
196196
PreprocessTableCreation(session) +:
197197
PreprocessTableInsertion +:
198-
DataSourceAnalysis +:
198+
DataSourceAnalysis(this) +:
199199
ReplaceCharWithVarchar +:
200200
customPostHocResolutionRules
201201

sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterAll
2121

2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.AnalysisException
24+
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
2425
import org.apache.spark.sql.catalyst.dsl.expressions._
2526
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal}
2627
import org.apache.spark.sql.catalyst.plans.SQLHelper
@@ -70,7 +71,7 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll with
7071
Cast(e, dt, Option(SQLConf.get.sessionLocalTimeZone))
7172
}
7273
}
73-
val rule = DataSourceAnalysis
74+
val rule = DataSourceAnalysis(SimpleAnalyzer)
7475
testRule(
7576
"convertStaticPartitions only handle INSERT having at least static partitions",
7677
caseSensitive) {

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,24 +1029,22 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
10291029
}
10301030
// The default value fails to analyze.
10311031
withTable("t") {
1032-
sql("create table t(i boolean, s bigint default badvalue) using parquet")
10331032
assert(intercept[AnalysisException] {
1034-
sql("insert into t values (default, default)")
1033+
sql("create table t(i boolean, s bigint default badvalue) using parquet")
10351034
}.getMessage.contains(Errors.COMMON_SUBSTRING))
10361035
}
10371036
// The default value analyzes to a table not in the catalog.
10381037
withTable("t") {
1039-
sql("create table t(i boolean, s bigint default (select min(x) from badtable)) using parquet")
10401038
assert(intercept[AnalysisException] {
1041-
sql("insert into t values (default, default)")
1039+
sql("create table t(i boolean, s bigint default (select min(x) from badtable)) " +
1040+
"using parquet")
10421041
}.getMessage.contains(Errors.COMMON_SUBSTRING))
10431042
}
10441043
// The default value parses but refers to a table from the catalog.
10451044
withTable("t", "other") {
10461045
sql("create table other(x string) using parquet")
1047-
sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet")
10481046
assert(intercept[AnalysisException] {
1049-
sql("insert into t values (default, default)")
1047+
sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet")
10501048
}.getMessage.contains(Errors.COMMON_SUBSTRING))
10511049
}
10521050
// The default value has an explicit alias. It fails to evaluate when inlined into the VALUES
@@ -1083,10 +1081,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
10831081
}
10841082
// The default value parses but the type is not coercible.
10851083
withTable("t") {
1086-
sql("create table t(i boolean, s bigint default false) using parquet")
10871084
assert(intercept[AnalysisException] {
1088-
sql("insert into t values (default, default)")
1089-
}.getMessage.contains("provided a value of incompatible type"))
1085+
sql("create table t(i boolean, s bigint default false) using parquet")
1086+
}.getMessage.contains(Errors.COMMON_SUBSTRING))
10901087
}
10911088
// The number of columns in the INSERT INTO statement is greater than the number of columns in
10921089
// the table.
@@ -1617,6 +1614,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
16171614
TestCase(
16181615
dataSource = "parquet",
16191616
Seq(
1617+
Config(
1618+
None),
16201619
Config(
16211620
Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
16221621
insertNullsToStorage = false)))

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class HiveSessionStateBuilder(
100100
RelationConversions(catalog) +:
101101
PreprocessTableCreation(session) +:
102102
PreprocessTableInsertion +:
103-
DataSourceAnalysis +:
103+
DataSourceAnalysis(this) +:
104104
HiveAnalysis +:
105105
ReplaceCharWithVarchar +:
106106
customPostHocResolutionRules

0 commit comments

Comments
 (0)