Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Oct 9, 2019

What changes were proposed in this pull request?

This PR fixes a thread-safety bug in SparkSession.createDataset(Seq): if the caller-supplied Encoder is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in Dataset.collect().

The fix implemented here is based on #24735's updated version of the Datataset.collect() bugfix: use .copy(). For consistency, I used same code comment / explanation as that PR.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Tested manually using the example listed above.

Thanks to @smcnamara-stripe for identifying this bug.

@JoshRosen JoshRosen added the SQL label Oct 9, 2019
@JoshRosen JoshRosen requested review from cloud-fan and zsxwing October 9, 2019 20:44
@JoshRosen
Copy link
Contributor Author

JoshRosen commented Oct 9, 2019

I'll submit a separate patch for 2.4.x.

Actually, this is a clean merge with 2.4.x, so we can merge this PR to both branches.

@JoshRosen JoshRosen changed the title [SPARK-29419][SQL] Fix Encoder thread-safety issue in createDataset(Seq) [SPARK-29419][SQL] Fix Encoder thread-safety bugissue in createDataset(Seq) Oct 9, 2019
@JoshRosen JoshRosen changed the title [SPARK-29419][SQL] Fix Encoder thread-safety bugissue in createDataset(Seq) [SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq) Oct 9, 2019
@zsxwing
Copy link
Member

zsxwing commented Oct 9, 2019

the caller-supplied Encoder is used in multiple threads

I would say this is a user error. But I agree that it's pretty easy to overlook. IMO, I made such mistake when I first used Encoder. But if we would like to fix all these problems, all public APIs accepting Encoder will need the copy. I did some research about this and found some noticeable performance regression in our internal benchmark. That's why I finally just submitted #25209 to make users easy to copy an Encoder instead.

@JoshRosen
Copy link
Contributor Author

JoshRosen commented Oct 9, 2019

But if we would like to fix all these problems, all public APIs accepting Encoder will need the copy.

I think that most existing uses of Encoders are de-facto thread-safe because either (a) the use occurs inside of a Spark task and task gets its own fresh copy of the Encoder when the Task is deserialized or (b) the use occurs on the driver but the code calls call resolveAndBind (which internally performs a copy) prior to using the Encoder.

Given this, I suspect that this might be the only non-thread-safe Encoder usage in Spark (excluding code which is only used in Spark's unit tests). I don't think that we need to introduce similar copying in other public APIs.

I did some research about this and found some noticeable performance regression in our internal benchmark.

What do you think about improving the performance / reducing the cost of .copy() by refactoring the ExpressionEncoder class such that (a) all of the immutable vals become fields of the case class, (b) the current constructor becomes a .apply() on the companion object and the case class constructor becomes private, and (c) resolveAndBind calls the companion object constructor instead of copy()? Given this, I think copy() could be really cheap, effectively giving us a fresh copy of the internal mutable state but copying all other immutable attributes without performing any re-resolution, analysis, attribute binding, etc.

If we do that, we'd be able to defensively copy at very low cost (e.g. one object allocation) and then could copy-by-default and free users from having to worry about thread-safety.

I think that's a potentially huge win from a developer productivity point-of-view: the cost / toil of having to worry about thread-unsafe code is a tax placed on end users and creates a developer education / training burden, so I think it's worth attempting to eliminate this entire class of pitfall.

@cloud-fan
Copy link
Contributor

reducing the cost of .copy() by refactoring the ExpressionEncoder class

That sounds like a good idea to me. Can we do that first?

@JoshRosen
Copy link
Contributor Author

That sounds like a good idea to me. Can we do that first?

I'll prototype this. If I get it working then I'll open a second PR and will ping / link it here.

@dongjoon-hyun
Copy link
Member

Hi, @JoshRosen . Is there any update on this PR?

@JoshRosen
Copy link
Contributor Author

Hi @dongjoon-hyun,

I spent some time prototyping a refactoring of ExpressionEncoder which separates the mutable and immutable state (in order to significantly reduce the cost of .copy()). This is doable but ends up involving a lot of code movement and potentially some duplication (since some helper functions are in both the constructor and after construction). I think that's definitely the right long-term approach but I'll need some more time to figure out a minimally-invasive and clean way of making that change.

@dongjoon-hyun
Copy link
Member

Thank you for informing that, @JoshRosen . (I forgot to comment back here.)
I was interested in this because this is a correctness issue.

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

Shall we merge this one first since 3.0 release is pretty close now?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay as a bandaid fix at this moment.

@HyukjinKwon
Copy link
Member

WDYT @zsxwing, @JoshRosen, @dongjoon-hyun?

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119064 has finished for PR 26076 at commit 62de678.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119081 has finished for PR 26076 at commit 62de678.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119088 has finished for PR 26076 at commit 62de678.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Roughly synced with @JoshRosen offline. I am going to merge this.

Merged to master, branch-3.0 and branch-2.4.

HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <[email protected]>
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 2, 2020

Thank you, @JoshRosen and @HyukjinKwon .

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / apache#19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on apache#24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes apache#26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants