Skip to content

Commit 8cdc0d4

Browse files
yhuailiancheng
authored andcommitted
[SPARK-9876] [BRANCH-2.0] Revert "[SPARK-9876][SQL] Update Parquet to 1.8.1."
## What changes were proposed in this pull request? Since we are pretty late in the 2.0 release cycle, it is not clear if this upgrade can be tested thoroughly and if we can resolve the regression issue that we observed before. This PR reverts #13280 from branch 2.0. ## How was this patch tested? Existing tests This reverts commit 776d183. Author: Yin Huai <[email protected]> Closes #13450 from yhuai/revertParquet1.8.1-branch-2.0.
1 parent e033fd5 commit 8cdc0d4

File tree

11 files changed

+117
-91
lines changed

11 files changed

+117
-91
lines changed

dev/deps/spark-deps-hadoop-2.2

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,14 @@ opencsv-2.3.jar
129129
oro-2.0.8.jar
130130
osgi-resource-locator-1.0.1.jar
131131
paranamer-2.3.jar
132-
parquet-column-1.8.1.jar
133-
parquet-common-1.8.1.jar
134-
parquet-encoding-1.8.1.jar
132+
parquet-column-1.7.0.jar
133+
parquet-common-1.7.0.jar
134+
parquet-encoding-1.7.0.jar
135135
parquet-format-2.3.0-incubating.jar
136-
parquet-hadoop-1.8.1.jar
136+
parquet-generator-1.7.0.jar
137+
parquet-hadoop-1.7.0.jar
137138
parquet-hadoop-bundle-1.6.0.jar
138-
parquet-jackson-1.8.1.jar
139+
parquet-jackson-1.7.0.jar
139140
pmml-model-1.2.15.jar
140141
pmml-schema-1.2.15.jar
141142
protobuf-java-2.5.0.jar

dev/deps/spark-deps-hadoop-2.3

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,14 @@ opencsv-2.3.jar
136136
oro-2.0.8.jar
137137
osgi-resource-locator-1.0.1.jar
138138
paranamer-2.3.jar
139-
parquet-column-1.8.1.jar
140-
parquet-common-1.8.1.jar
141-
parquet-encoding-1.8.1.jar
139+
parquet-column-1.7.0.jar
140+
parquet-common-1.7.0.jar
141+
parquet-encoding-1.7.0.jar
142142
parquet-format-2.3.0-incubating.jar
143-
parquet-hadoop-1.8.1.jar
143+
parquet-generator-1.7.0.jar
144+
parquet-hadoop-1.7.0.jar
144145
parquet-hadoop-bundle-1.6.0.jar
145-
parquet-jackson-1.8.1.jar
146+
parquet-jackson-1.7.0.jar
146147
pmml-model-1.2.15.jar
147148
pmml-schema-1.2.15.jar
148149
protobuf-java-2.5.0.jar

dev/deps/spark-deps-hadoop-2.4

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,14 @@ opencsv-2.3.jar
136136
oro-2.0.8.jar
137137
osgi-resource-locator-1.0.1.jar
138138
paranamer-2.3.jar
139-
parquet-column-1.8.1.jar
140-
parquet-common-1.8.1.jar
141-
parquet-encoding-1.8.1.jar
139+
parquet-column-1.7.0.jar
140+
parquet-common-1.7.0.jar
141+
parquet-encoding-1.7.0.jar
142142
parquet-format-2.3.0-incubating.jar
143-
parquet-hadoop-1.8.1.jar
143+
parquet-generator-1.7.0.jar
144+
parquet-hadoop-1.7.0.jar
144145
parquet-hadoop-bundle-1.6.0.jar
145-
parquet-jackson-1.8.1.jar
146+
parquet-jackson-1.7.0.jar
146147
pmml-model-1.2.15.jar
147148
pmml-schema-1.2.15.jar
148149
protobuf-java-2.5.0.jar

dev/deps/spark-deps-hadoop-2.6

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,14 @@ opencsv-2.3.jar
144144
oro-2.0.8.jar
145145
osgi-resource-locator-1.0.1.jar
146146
paranamer-2.3.jar
147-
parquet-column-1.8.1.jar
148-
parquet-common-1.8.1.jar
149-
parquet-encoding-1.8.1.jar
147+
parquet-column-1.7.0.jar
148+
parquet-common-1.7.0.jar
149+
parquet-encoding-1.7.0.jar
150150
parquet-format-2.3.0-incubating.jar
151-
parquet-hadoop-1.8.1.jar
151+
parquet-generator-1.7.0.jar
152+
parquet-hadoop-1.7.0.jar
152153
parquet-hadoop-bundle-1.6.0.jar
153-
parquet-jackson-1.8.1.jar
154+
parquet-jackson-1.7.0.jar
154155
pmml-model-1.2.15.jar
155156
pmml-schema-1.2.15.jar
156157
protobuf-java-2.5.0.jar

dev/deps/spark-deps-hadoop-2.7

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,14 @@ opencsv-2.3.jar
145145
oro-2.0.8.jar
146146
osgi-resource-locator-1.0.1.jar
147147
paranamer-2.3.jar
148-
parquet-column-1.8.1.jar
149-
parquet-common-1.8.1.jar
150-
parquet-encoding-1.8.1.jar
148+
parquet-column-1.7.0.jar
149+
parquet-common-1.7.0.jar
150+
parquet-encoding-1.7.0.jar
151151
parquet-format-2.3.0-incubating.jar
152-
parquet-hadoop-1.8.1.jar
152+
parquet-generator-1.7.0.jar
153+
parquet-hadoop-1.7.0.jar
153154
parquet-hadoop-bundle-1.6.0.jar
154-
parquet-jackson-1.8.1.jar
155+
parquet-jackson-1.7.0.jar
155156
pmml-model-1.2.15.jar
156157
pmml-schema-1.2.15.jar
157158
protobuf-java-2.5.0.jar

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
<!-- Version used for internal directory structure -->
134134
<hive.version.short>1.2.1</hive.version.short>
135135
<derby.version>10.11.1.1</derby.version>
136-
<parquet.version>1.8.1</parquet.version>
136+
<parquet.version>1.7.0</parquet.version>
137137
<hive.parquet.version>1.6.0</hive.parquet.version>
138138
<jetty.version>9.2.16.v20160414</jetty.version>
139139
<javaxservlet.version>3.1.0</javaxservlet.version>

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
5959
import org.apache.parquet.hadoop.util.ConfigurationUtil;
6060
import org.apache.parquet.schema.MessageType;
61-
import org.apache.parquet.schema.PrimitiveType;
62-
import org.apache.parquet.schema.Type;
6361
import org.apache.parquet.schema.Types;
6462
import org.apache.spark.sql.types.StructType;
6563

@@ -188,19 +186,15 @@ protected void initialize(String path, List<String> columns) throws IOException
188186
if (columns == null) {
189187
this.requestedSchema = fileSchema;
190188
} else {
191-
if (columns.size() > 0) {
192-
Types.MessageTypeBuilder builder = Types.buildMessage();
193-
for (String s: columns) {
194-
if (!fileSchema.containsField(s)) {
195-
throw new IOException("Can only project existing columns. Unknown field: " + s +
196-
" File schema:\n" + fileSchema);
197-
}
198-
builder.addFields(fileSchema.getType(s));
189+
Types.MessageTypeBuilder builder = Types.buildMessage();
190+
for (String s: columns) {
191+
if (!fileSchema.containsField(s)) {
192+
throw new IOException("Can only project existing columns. Unknown field: " + s +
193+
" File schema:\n" + fileSchema);
199194
}
200-
this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
201-
} else {
202-
this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
195+
builder.addFields(fileSchema.getType(s));
203196
}
197+
this.requestedSchema = builder.named("spark_schema");
204198
}
205199
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
206200
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,10 @@ private[parquet] object CatalystReadSupport {
109109
*/
110110
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
111111
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
112-
if (clippedParquetFields.isEmpty) {
113-
CatalystSchemaConverter.EMPTY_MESSAGE
114-
} else {
115-
Types
116-
.buildMessage()
117-
.addFields(clippedParquetFields: _*)
118-
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
119-
}
112+
Types
113+
.buildMessage()
114+
.addFields(clippedParquetFields: _*)
115+
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
120116
}
121117

122118
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -538,22 +538,6 @@ private[parquet] class CatalystSchemaConverter(
538538
private[parquet] object CatalystSchemaConverter {
539539
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
540540

541-
// !! HACK ALERT !!
542-
//
543-
// PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
544-
// which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
545-
// This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
546-
//
547-
// To workaround this problem, here we first construct a `MessageType` with a single dummy
548-
// field, and then remove the field to obtain an empty `MessageType`.
549-
//
550-
// TODO Reverts this change after upgrading parquet-mr to 1.8.2+
551-
val EMPTY_MESSAGE = Types
552-
.buildMessage()
553-
.required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
554-
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
555-
EMPTY_MESSAGE.getFields.clear()
556-
557541
def checkFieldName(name: String): Unit = {
558542
// ,;{}()\n\t= and space are special characters in Parquet schema
559543
checkConversionRequirement(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.io.Serializable
2222
import org.apache.parquet.filter2.predicate._
2323
import org.apache.parquet.filter2.predicate.FilterApi._
2424
import org.apache.parquet.io.api.Binary
25+
import org.apache.parquet.schema.OriginalType
26+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
2527

2628
import org.apache.spark.sql.sources
2729
import org.apache.spark.sql.types._
@@ -51,15 +53,18 @@ private[sql] object ParquetFilters {
5153
case DoubleType =>
5254
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
5355

56+
// See https://issues.apache.org/jira/browse/SPARK-11153
57+
/*
5458
// Binary.fromString and Binary.fromByteArray don't accept null values
5559
case StringType =>
5660
(n: String, v: Any) => FilterApi.eq(
5761
binaryColumn(n),
58-
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
62+
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
5963
case BinaryType =>
6064
(n: String, v: Any) => FilterApi.eq(
6165
binaryColumn(n),
62-
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
66+
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
67+
*/
6368
}
6469

6570
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -74,14 +79,17 @@ private[sql] object ParquetFilters {
7479
case DoubleType =>
7580
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
7681

82+
// See https://issues.apache.org/jira/browse/SPARK-11153
83+
/*
7784
case StringType =>
7885
(n: String, v: Any) => FilterApi.notEq(
7986
binaryColumn(n),
80-
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
87+
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
8188
case BinaryType =>
8289
(n: String, v: Any) => FilterApi.notEq(
8390
binaryColumn(n),
84-
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
91+
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
92+
*/
8593
}
8694

8795
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -94,13 +102,16 @@ private[sql] object ParquetFilters {
94102
case DoubleType =>
95103
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
96104

105+
// See https://issues.apache.org/jira/browse/SPARK-11153
106+
/*
97107
case StringType =>
98108
(n: String, v: Any) =>
99109
FilterApi.lt(binaryColumn(n),
100-
Binary.fromString(v.asInstanceOf[String]))
110+
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
101111
case BinaryType =>
102112
(n: String, v: Any) =>
103-
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
113+
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
114+
*/
104115
}
105116

106117
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -113,13 +124,16 @@ private[sql] object ParquetFilters {
113124
case DoubleType =>
114125
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
115126

127+
// See https://issues.apache.org/jira/browse/SPARK-11153
128+
/*
116129
case StringType =>
117130
(n: String, v: Any) =>
118131
FilterApi.ltEq(binaryColumn(n),
119-
Binary.fromString(v.asInstanceOf[String]))
132+
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
120133
case BinaryType =>
121134
(n: String, v: Any) =>
122-
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
135+
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
136+
*/
123137
}
124138

125139
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -133,13 +147,15 @@ private[sql] object ParquetFilters {
133147
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
134148

135149
// See https://issues.apache.org/jira/browse/SPARK-11153
150+
/*
136151
case StringType =>
137152
(n: String, v: Any) =>
138153
FilterApi.gt(binaryColumn(n),
139-
Binary.fromString(v.asInstanceOf[String]))
154+
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
140155
case BinaryType =>
141156
(n: String, v: Any) =>
142-
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
157+
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
158+
*/
143159
}
144160

145161
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -152,13 +168,16 @@ private[sql] object ParquetFilters {
152168
case DoubleType =>
153169
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
154170

171+
// See https://issues.apache.org/jira/browse/SPARK-11153
172+
/*
155173
case StringType =>
156174
(n: String, v: Any) =>
157175
FilterApi.gtEq(binaryColumn(n),
158-
Binary.fromString(v.asInstanceOf[String]))
176+
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
159177
case BinaryType =>
160178
(n: String, v: Any) =>
161-
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
179+
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
180+
*/
162181
}
163182

164183
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
@@ -175,14 +194,17 @@ private[sql] object ParquetFilters {
175194
(n: String, v: Set[Any]) =>
176195
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
177196

197+
// See https://issues.apache.org/jira/browse/SPARK-11153
198+
/*
178199
case StringType =>
179200
(n: String, v: Set[Any]) =>
180201
FilterApi.userDefined(binaryColumn(n),
181-
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
202+
SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
182203
case BinaryType =>
183204
(n: String, v: Set[Any]) =>
184205
FilterApi.userDefined(binaryColumn(n),
185-
SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
206+
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
207+
*/
186208
}
187209

188210
/**
@@ -206,6 +228,8 @@ private[sql] object ParquetFilters {
206228
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
207229
val dataTypeOf = getFieldMap(schema).toMap
208230

231+
relaxParquetValidTypeMap
232+
209233
// NOTE:
210234
//
211235
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -275,4 +299,35 @@ private[sql] object ParquetFilters {
275299
case _ => None
276300
}
277301
}
302+
303+
// !! HACK ALERT !!
304+
//
305+
// This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
306+
// parquet-mr 1.8.1 or higher versions.
307+
//
308+
// In Parquet, not all types of columns can be used for filter push-down optimization. The set
309+
// of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
310+
// prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
311+
// pushed down.
312+
//
313+
// This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
314+
// to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
315+
// a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
316+
// legal except that it fails the `ValidTypeMap` check.
317+
//
318+
// Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
319+
private lazy val relaxParquetValidTypeMap: Unit = {
320+
val constructor = Class
321+
.forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
322+
.getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])
323+
324+
constructor.setAccessible(true)
325+
val enumTypeDescriptor = constructor
326+
.newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
327+
.asInstanceOf[AnyRef]
328+
329+
val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
330+
addMethod.setAccessible(true)
331+
addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
332+
}
278333
}

0 commit comments

Comments
 (0)