Skip to content

Conversation

@liancheng
Copy link
Contributor

This PR can be quite challenging to review. I'm trying to give a detailed description of the problem as well as its solution here.

When reading Parquet files, we need to specify a potentially nested Parquet schema (of type MessageType) as requested schema for column pruning. This Parquet schema is translated from a Catalyst schema (of type StructType), which is generated by the query planner and represents all requested columns. However, this translation can be fairly complicated because of several reasons:

  1. Requested schema must conform to the real schema of the physical file to be read.

This means we have to tailor the actual file schema of every individual physical Parquet file to be read according to the given Catalyst schema. Fortunately we are already doing this in Spark 1.5 by pushing request schema conversion to executor side in PR #7231.
2. Support for schema merging.

A single Parquet dataset may consist of multiple physical Parquet files come with different but compatible schemas. This means we may request for a column path that doesn't exist in a physical Parquet file. All requested column paths can be nested. For example, for a Parquet file schema

message root {
  required group f0 {
    required group f00 {
      required int32 f000;
      required binary f001 (UTF8);
    }
  }
}

we may request for column paths defined in the following schema:

message root {
  required group f0 {
    required group f00 {
      required binary f001 (UTF8);
      required float f002;
    }
  }

  optional double f1;
}

Notice that we pruned column path f0.f00.f000, but added f0.f00.f002 and f1.

The good news is that Parquet handles non-existing column paths properly and always returns null for them.
3. The map from StructType to MessageType is a one-to-many map.

This is the most unfortunate part.

Due to historical reasons (dark histories!), schemas of Parquet files generated by different libraries have different "flavors". For example, to handle a schema with a single non-nullable column, whose type is an array of non-nullable integers, parquet-protobuf generates the following Parquet schema:

message m0 {
  repeated int32 f;
}

while parquet-avro generates another version:

message m1 {
  required group f (LIST) {
    repeated int32 array;
  }
}

and parquet-thrift spills this:

message m1 {
  required group f (LIST) {
    repeated int32 f_tuple;
  }
}

All of them can be mapped to the following unique Catalyst schema:

StructType(
  StructField(
    "f",
    ArrayType(IntegerType, containsNull = false),
    nullable = false))

This greatly complicates Parquet requested schema construction, since the path of a given column varies in different cases. To read the array elements from files with the above schemas, we must use f for m0, f.array for m1, and f.f_tuple for m2.

In earlier Spark versions, we didn't try to fix this issue properly. Spark 1.4 and prior versions simply translate the Catalyst schema in a way more or less compatible with parquet-hive and parquet-avro, but is broken in many other cases. Earlier revisions of Spark 1.5 only try to tailor the Parquet file schema at the first level, and ignore nested ones. This caused SPARK-10301 as well as SPARK-10005. In PR #8228, I tried to avoid the hard part of the problem and made a minimum change in CatalystRowConverter to fix SPARK-10005. However, when taking SPARK-10301 into consideration, keeping hacking CatalystRowConverter doesn't seem to be a good idea. So this PR is an attempt to fix the problem in a proper way.

For a given physical Parquet file with schema ps and a compatible Catalyst requested schema cs, we use the following algorithm to tailor ps to get the result Parquet requested schema ps':

For a leaf column path c in cs:

  • if c exists in cs and a corresponding Parquet column path c' can be found in ps, c' should be included in ps';
  • otherwise, we convert c to a Parquet column path c" using CatalystSchemaConverter, and include c" in ps';
  • no other column paths should exist in ps'.

Then comes the most tedious part:

Given cs, ps, and c, how to locate c' in ps?

Unfortunately, there's no quick answer, and we have to enumerate all possible structures defined in parquet-format spec. They are:

  1. the standard structure of nested types, and
  2. cases defined in all backwards-compatibility rules for LIST and MAP.

The core part of this PR is CatalystReadSupport.clipParquetType(), which tailors a given Parquet file schema according to a requested schema in its Catalyst form. Backwards-compatibility rules of LIST and MAP are covered in clipParquetListType() and clipParquetMapType() respectively. The column path selection algorithm is implemented in clipParquetGroupFields().

With this PR, we no longer need to do schema tailoring in CatalystReadSupport and CatalystRowConverter. Another benefit is that, now we can also read Parquet datasets consist of files with different physical Parquet schema but share the same logical schema, for example, files generated by different Parquet libraries. This situation is illustrated by this test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an irrelevant change, added to stop IntelliJ IDEA highlighting errors in ScalaDoc.

@liancheng liancheng force-pushed the spark-10301/fix-parquet-requested-schema branch from f32bedf to 88ab2a9 Compare August 28, 2015 16:48
@liancheng
Copy link
Contributor Author

@rxin Considering this is a pretty major change and SPARK-10301 isn't a blocker, I'm not quite sure whether we should include this into 1.5 at this moment. Another thing to note is that, to the best of my knowledge, most (if not all) existing Parquet libraries suffer this issue.

@SparkQA
Copy link

SparkQA commented Aug 28, 2015

Test build #41749 has finished for PR 8509 at commit a53cf34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2015

Test build #41750 has finished for PR 8509 at commit 88ab2a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

This seems risky to include in branch-1.5 given how far along in the process we are. I'd propose we instead merge a small patch that checks that the things being zipped are the same size, and if not throws and error asking the user to turn on schema merging (the parquet error is very confusing). We can merge this into master.

@liancheng
Copy link
Contributor Author

The quick fix @marmbrus mentioned has been added as part of #8515 (yhuai@b509bee).

@liancheng liancheng force-pushed the spark-10301/fix-parquet-requested-schema branch from 5004365 to f21d88e Compare August 30, 2015 10:12
@SparkQA
Copy link

SparkQA commented Aug 30, 2015

Test build #41809 has finished for PR 8509 at commit 38644d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaTrainValidationSplitExample
    • class KMeans @Since("1.5.0") (
    • class GaussianMixtureModel @Since("1.3.0") (
    • class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vector])
    • class PowerIterationClusteringModel @Since("1.3.0") (
    • class StreamingKMeansModel @Since("1.2.0") (
    • class StreamingKMeans @Since("1.2.0") (
    • class ChiSqSelectorModel @Since("1.3.0") (
    • class ChiSqSelector @Since("1.3.0") (
    • class ElementwiseProduct @Since("1.4.0") (
    • class IDF @Since("1.2.0") (@Since("1.2.0") val minDocFreq: Int)
    • class Normalizer @Since("1.1.0") (p: Double) extends VectorTransformer
    • class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int)
    • class StandardScaler @Since("1.1.0") (withMean: Boolean, withStd: Boolean) extends Logging
    • class StandardScalerModel @Since("1.3.0") (
    • class PoissonGenerator @Since("1.1.0") (
    • class ExponentialGenerator @Since("1.3.0") (
    • class GammaGenerator @Since("1.3.0") (
    • class LogNormalGenerator @Since("1.3.0") (
    • case class Rating @Since("0.8.0") (
    • class MatrixFactorizationModel @Since("0.8.0") (
    • abstract class GeneralizedLinearModel @Since("1.0.0") (
    • class IsotonicRegressionModel @Since("1.3.0") (
    • case class LabeledPoint @Since("1.0.0") (
    • class LassoModel @Since("1.1.0") (
    • class LinearRegressionModel @Since("1.1.0") (
    • class RidgeRegressionModel @Since("1.1.0") (
    • class MultivariateGaussian @Since("1.3.0") (
    • case class BoostingStrategy @Since("1.4.0") (
    • class Strategy @Since("1.3.0") (
    • class DecisionTreeModel @Since("1.0.0") (
    • class Node @Since("1.2.0") (
    • class Predict @Since("1.2.0") (
    • class RandomForestModel @Since("1.2.0") (
    • class GradientBoostedTreesModel @Since("1.2.0") (
    • case class LimitNode(limit: Int, child: LocalNode) extends UnaryLocalNode
    • case class UnionNode(children: Seq[LocalNode]) extends LocalNode

@SparkQA
Copy link

SparkQA commented Aug 30, 2015

Test build #41806 has finished for PR 8509 at commit f21d88e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class LimitNode(limit: Int, child: LocalNode) extends UnaryLocalNode
    • case class UnionNode(children: Seq[LocalNode]) extends LocalNode

@liancheng
Copy link
Contributor Author

Merging to master.

@asfgit asfgit closed this in 391e6be Sep 1, 2015
@liancheng liancheng deleted the spark-10301/fix-parquet-requested-schema branch September 1, 2015 10:52
liancheng added a commit to liancheng/spark that referenced this pull request Sep 3, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about UDT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like for a UDT, we need to call isPrimitiveCatalystType on the sqlType of this UDT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After chatted with @liancheng offline, we should not handle UDT here (leave it as it's).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, let's have a comment at here to explain the reason.

@yhuai
Copy link
Contributor

yhuai commented Sep 4, 2015

Also, if there is any change we have in https://github.com/apache/spark/pull/8583/files but not in this one, let's have a follow-up for our master branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also call clipParquetType for parquetKeyType? What will happen if the key is a complex type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't allow key type to be complex type in Spark SQL. This is consistent with Hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, although complex map keys are not allowed while using HiveQL in Spark SQL, they are allowed otherwise, and we can read/write them from/to Parquet successfully. So we do need to handle complex map key here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added support for this in #8583.

@liancheng
Copy link
Contributor Author

Since this PR has already been merged, I'm addressing all the comments in #8583, which backports this PR to branch-1.5. Will send out a separate PR later to address these issues for master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be f01ElementType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this error has been fixed in #8583.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an assert here to make sure parquetType matches catalystType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought it would be too complicated to add this assertion here since there can be multiple Parquet representation for a single Catalyst type, and some of them may even conflict with each other. But I just realized that we can simply resort to CatalystSchemaConverter to convert parquetType to a Catalyst type and see whether the result matches catalystType. This is because the mapping from Catalyst type to Parquet type is a one-to-many mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found adding this assertion is still a pretty big change. Since it's only defensive and doesn't affect correctness, I'd like to have this one in a separate PR.

@liancheng
Copy link
Contributor Author

Now all the comments are addressed in #8583.

asfgit pushed a commit that referenced this pull request Sep 9, 2015
…or nested structs

We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR #8515), but it doesn't cover the case described in SPARK-10428. So this PR backports PR #8509, which had once been considered too big a change to be merged into branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for Spark 1.5. Also added more test cases for SPARK-10428.

This PR looks big, but the essential change is only ~200 loc. All other changes are for testing. Especially, PR #8454 is also backported here because the `ParquetInteroperabilitySuite` introduced in PR #8515 depends on it. This should be safe since #8454 only touches testing code.

Author: Cheng Lian <[email protected]>

Closes #8583 from liancheng/spark-10301/for-1.5.
liancheng added a commit to liancheng/spark that referenced this pull request Sep 9, 2015
asfgit pushed a commit that referenced this pull request Sep 10, 2015
…8509 for master

Author: Cheng Lian <[email protected]>

Closes #8670 from liancheng/spark-10301/address-pr-comments.
ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
…or nested structs

We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR apache#8515), but it doesn't cover the case described in SPARK-10428. So this PR backports PR apache#8509, which had once been considered too big a change to be merged into branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for Spark 1.5. Also added more test cases for SPARK-10428.

This PR looks big, but the essential change is only ~200 loc. All other changes are for testing. Especially, PR apache#8454 is also backported here because the `ParquetInteroperabilitySuite` introduced in PR apache#8515 depends on it. This should be safe since apache#8454 only touches testing code.

Author: Cheng Lian <[email protected]>

Closes apache#8583 from liancheng/spark-10301/for-1.5.

(cherry picked from commit fca16c5)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants