Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Dec 24, 2019

What changes were proposed in this pull request?

This PR modifies ParquetRowConverter to remove unnecessary InternalRow.copy() calls for structs that are directly nested in other structs.

Why are the changes needed?

These changes can significantly improve performance when reading Parquet files that contain deeply-nested structs with many fields.

The ParquetRowConverter uses per-field Converters for handling individual fields. Internally, these converters may have mutable state and may return mutable objects. In most cases, each converter is only invoked once per Parquet record (this is true for top-level fields, for example). However, arrays and maps may call their child element converters multiple times per Parquet record: in these cases we must be careful to copy any mutable outputs returned by child converters.

In the existing code, InternalRows are copied whenever they are stored into any parent container (not just maps and arrays). This copying can be especially expensive for deeply-nested fields, since a deep copy is performed at every level of nesting.

This PR modifies the code to avoid copies for structs that are directly nested in structs; see inline code comments for an argument for why this is safe.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Correctness: I added new test cases to ParquetIOSuite to increase coverage of nested structs, including structs nested in arrays: previously this suite didn't test that case, so we used to lack mutation coverage of this copy() code (the suite's tests still passed if I incorrectly removed the .copy() in all cases). I also added a test for maps with struct keys and modified the existing "map with struct values" test case include maps with two elements (since the incorrect omission of a copy() can only be detected if the map has multiple elements).

Performance: I put together a simple local benchmark demonstrating the performance problems:

First, construct a nested schema:

  case class Inner(
    f1: Int,
    f2: Long,
    f3: String,
    f4: Int,
    f5: Long,
    f6: String,
    f7: Int,
    f8: Long,
    f9: String,
    f10: Int
  )
  
  case class Wrapper1(inner: Inner)
  case class Wrapper2(wrapper1: Wrapper1)
  case class Wrapper3(wrapper2: Wrapper2)

Wrapper3's schema looks like:

root
 |-- wrapper2: struct (nullable = true)
 |    |-- wrapper1: struct (nullable = true)
 |    |    |-- inner: struct (nullable = true)
 |    |    |    |-- f1: integer (nullable = true)
 |    |    |    |-- f2: long (nullable = true)
 |    |    |    |-- f3: string (nullable = true)
 |    |    |    |-- f4: integer (nullable = true)
 |    |    |    |-- f5: long (nullable = true)
 |    |    |    |-- f6: string (nullable = true)
 |    |    |    |-- f7: integer (nullable = true)
 |    |    |    |-- f8: long (nullable = true)
 |    |    |    |-- f9: string (nullable = true)
 |    |    |    |-- f10: integer (nullable = true)

Next, generate some fake data:

  val data = spark.range(1, 1000 * 1000 * 25, 1, 1).map { i =>
    Wrapper3(Wrapper2(Wrapper1(Inner(
      i.toInt,
      i * 2,
      (i * 3).toString,
      (i * 4).toInt,
      i * 5,
      (i * 6).toString,
      (i * 7).toInt,
      i * 8,
      (i * 9).toString,
      (i * 10).toInt
    ))))
  }

  data.write.mode("overwrite").parquet("/tmp/parquet-test")

I then ran a simple benchmark consisting of

spark.read.parquet("/tmp/parquet-test").selectExpr("hash(*)").rdd.count()

where the hash(*) is designed to force decoding of all Parquet fields but avoids RowEncoder costs in the .rdd.count() stage.

In the old code, expensive copying takes place at every level of nesting; this is apparent in the following flame graph:

image

After this PR's changes, the above toy benchmark runs ~30% faster.

@JoshRosen JoshRosen added the SQL label Dec 24, 2019
// we don't need to copy because copying will be done in the final
// UnsafeProjection, or
// 2. The path from the schema root to this field contains a map or array,
// in which case we will perform a recursive defensive copy via the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness relies on the copy actually being a deep copy. Looking elsewhere in this file, we have comments like

    // NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next
    // value.  `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row
    // cells.

which suggest that certain copying might be shallow, so it's important to double-check and make sure that the copies are indeed deep.

Here, the state being copied is an InternalRow. To be more specific, it's actually a SpecificInternalRow (I'll update the .asInstanceOf cast below to reflect this). SpecificInternalRow extends BaseGenericInternalRow and #18483 changed that to implement a deep-copy, recursively copying maps, arrays, and structs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the existing comment about Row.copy() is outdated, so we might be able to optimize those other parts of the code, too; I'm going to defer that to future work / another PR, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: in #27089 I'm removing these other unnecessary ArrayBuffer copies.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115677 has finished for PR 26993 at commit 2ed8ea9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115680 has finished for PR 26993 at commit 3fb3391.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115702 has finished for PR 26993 at commit 3fb3391.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115727 has finished for PR 26993 at commit 3fb3391.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115742 has finished for PR 26993 at commit 3fb3391.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115874 has finished for PR 26993 at commit fffe72b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen JoshRosen changed the title [WIP][SPARK-30338][SQL] Avoid unnecessary InternalRow copies in ParquetRowConverter [SPARK-30338][SQL] Avoid unnecessary InternalRow copies in ParquetRowConverter Dec 31, 2019
@JoshRosen
Copy link
Contributor Author

I've removed the [WIP] tag and I think this is now ready for review.

I've updated the existing "map with struct values" test so that it uses maps with multiple values (previously, we only tested with maps that contained one entry and that's insufficient to detect struct-copying problems (i.e. the old test would still pass if I completely removed the .copy() calls, whereas the new test will fail). For completeness, I also added a test for "maps with struct keys". I also changed the style of the new "struct of structs" test case in order to better match the style of this suite's existing tests.

new ParquetMapConverter(parquetType.asGroupType(), t, updater)

case t: StructType =>
val wrappedUpdater = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen, no big deal at all but how about we put the JIRA ID somewhere in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea: I added a JIRA reference in e6945e8

@HyukjinKwon
Copy link
Member

I happened to take a cursory look and seems pretty fine.

@SparkQA
Copy link

SparkQA commented Dec 31, 2019

Test build #115981 has finished for PR 26993 at commit e6945e8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 1, 2020

Test build #116000 has finished for PR 26993 at commit e6945e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@joshrosen-stripe
Copy link
Contributor

I ran a "read and hash all columns" benchmark on datasets with real-world schemas; these schemas contained ~50-300+ fields at various depths of nesting. The benchmark code looked roughly like

val data = spark.read.parquet(args.list("input"): _*)
data.select(hash($"*").as("hash")).groupBy().sum("hash").collect()

Comparing the map/scan stages' Total Time Across All Tasks metrics (from the Spark UI's "Stage Details" pages), it looks like this patch's changes result in ~6-25% time savings for this benchmark.

@joshrosen-stripe
Copy link
Contributor

joshrosen-stripe commented Jan 6, 2020

@cloud-fan @dongjoon-hyun @viirya, could you take a look at this PR optimizing nested struct handling in ParquetRowConverter? I'm tagging this group because it looks like you've all helped to review recent changes to this file and I'd like some more eyes on this change.

// `updater` is a RowUpdater, implying that the parent container is a struct.
// We do NOT need to perform defensive copying here because either:
//
// 1. The path from the schema root to this field consists only of nested
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we have deeply nested struct inside an array, is it the first case here?

I think it is fine because at the element converter the top level struct inside an array element will do the defensive copying. So in nested struct converter, we will see RowUpdater from parent struct so don't need defensive copying too.

Just maybe good to also update it in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. After thinking about this some more, I think I've come up with a clearer explanation and have updated the code comment: 4651b2f

}
}

testStandardAndLegacyModes("array of struct") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for array of struct of struct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new test case for this in 0f1af94

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks correct and pretty good for performance improvement.

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116188 has finished for PR 26993 at commit 4651b2f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116193 has finished for PR 26993 at commit 0f1af94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 93d3ab8 Jan 7, 2020
HyukjinKwon pushed a commit that referenced this pull request Jan 7, 2020
…lus misc. constant factors

### What changes were proposed in this pull request?

This PR implements multiple performance optimizations for `ParquetRowConverter`, achieving some modest constant-factor wins for all fields and larger wins for map and array fields:

- Add `private[this]` to several `val`s (90cebf0)
- Keep a `fieldUpdaters` array, saving two`.updater()` calls per field (7318785): I suspect that these are often megamorphic calls, so cutting these out seems like it could be a relatively large performance win.
- Only call `currentRow.numFields` once per `start()` call (e05de15): previously we'd call it once per field and this had a significant enough cost that it was visible during profiling.
- Reuse buffers in array and map converters (c7d1534, 6d16f59): previously we would create a brand-new Scala `ArrayBuffer` for each field read, but this isn't actually necessary because the data is already copied into a fresh array when `end()` constructs a `GenericArrayData`.

### Why are the changes needed?

To improve Parquet read performance; this is complementary to #26993's (orthogonal) improvements for nested struct read performance.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests, plus manual benchmarking with both synthetic and realistic schemas (similar to the ones in #26993). I've seen ~10%+ improvements in scan performance on certain real-world datasets.

Closes #27089 from JoshRosen/joshrosen/more-ParquetRowConverter-optimizations.

Lead-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants