Skip to content

Conversation

@jiangxb1987
Copy link
Contributor

What changes were proposed in this pull request?

Python uses a prefetch approach to read the result from upstream and serve them in another thread, thus it's possible that if the children operator doesn't consume all the data then the Task cleanup may happen before Python side read process finishes, this in turn create a race condition that the block read locks are freed during Task cleanup and then the reader try to release the read lock it holds and find it has been released, in this case we shall hit a AssertionError.

We shall catch the AssertionError in PythonRunner and prevent this kill the Executor.

How was this patch tested?

Hard to write a unit test case for this case, manually verified with failed job.

@jiangxb1987
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105176 has finished for PR 24542 at commit 0ca940c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@jiangxb1987
Copy link
Contributor Author

Thanks @cloud-fan @HyukjinKwon ! I'll backport this to 2.4 later.

@jiangxb1987 jiangxb1987 deleted the pyError branch May 8, 2019 05:50
jiangxb1987 added a commit to jiangxb1987/spark that referenced this pull request May 8, 2019
… in PythonRunner

## What changes were proposed in this pull request?

Python uses a prefetch approach to read the result from upstream and serve them in another thread, thus it's possible that if the children operator doesn't consume all the data then the Task cleanup may happen before Python side read process finishes, this in turn create a race condition that the block read locks are freed during Task cleanup and then the reader try to release the read lock it holds and find it has been released, in this case we shall hit a AssertionError.

We shall catch the AssertionError in PythonRunner and prevent this kill the Executor.

## How was this patch tested?

Hard to write a unit test case for this case, manually verified with failed job.

Closes apache#24542 from jiangxb1987/pyError.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request May 8, 2019
…cutor in PythonRunner

## What changes were proposed in this pull request?

Backport #24542 to 2.4.

## How was this patch tested?

existing tests

Closes #24552 from jiangxb1987/SPARK-25139-2.4.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
rezasafi pushed a commit to rezasafi/spark that referenced this pull request May 22, 2019
… in PythonRunner

## What changes were proposed in this pull request?

Python uses a prefetch approach to read the result from upstream and serve them in another thread, thus it's possible that if the children operator doesn't consume all the data then the Task cleanup may happen before Python side read process finishes, this in turn create a race condition that the block read locks are freed during Task cleanup and then the reader try to release the read lock it holds and find it has been released, in this case we shall hit a AssertionError.

We shall catch the AssertionError in PythonRunner and prevent this kill the Executor.

## How was this patch tested?

Hard to write a unit test case for this case, manually verified with failed job.

Closes apache#24542 from jiangxb1987/pyError.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit e63fbfc)
cloud-fan pushed a commit that referenced this pull request Jun 18, 2019
…pleted

## What changes were proposed in this pull request?

PythonRunner uses an asynchronous way, which produces elements in WriteThread but consumes elements in another thread, to execute task. When child operator, like take()/first(), does not consume all elements produced by WriteThread, task would finish before WriteThread and releases all locks on blocks. However, WriteThread would continue to produce elements by pulling elements from parent operator until it exhausts all elements. And at the time WriteThread exhausts all elements, it will try to release the corresponding block but hit a AssertionError since task has already released that lock previously.

#24542 previously fix this by catching AssertionError, so that we won't fail our executor.

However, when not using PySpark, issue still exists when user implements a custom RDD or task, which spawn a separate child thread to consume iterator from a cached parent RDD. Below is a demo which could easily reproduce the issue.

```
    val rdd0 = sc.parallelize(Range(0, 10), 1).cache()
    rdd0.collect()
    rdd0.mapPartitions { iter =>
      val t = new Thread(new Runnable {
        override def run(): Unit = {
          while(iter.hasNext) {
            println(iter.next())
            Thread.sleep(1000)
          }
        }
      })
      t.setDaemon(false)
      t.start()
      Iterator(0)
    }.collect()
    Thread.sleep(100000)
```

So, if we could prevent the separate thread from releasing lock on block when TaskContext has already completed, we won't hit this issue again.

## How was this patch tested?

Added in new unit test in RDDSuite.

Closes #24699 from Ngone51/SPARK-27666.

Authored-by: wuyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…cutor in PythonRunner

## What changes were proposed in this pull request?

Backport apache#24542 to 2.4.

## How was this patch tested?

existing tests

Closes apache#24552 from jiangxb1987/SPARK-25139-2.4.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…cutor in PythonRunner

## What changes were proposed in this pull request?

Backport apache#24542 to 2.4.

## How was this patch tested?

existing tests

Closes apache#24552 from jiangxb1987/SPARK-25139-2.4.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
XUJiahua pushed a commit to XUJiahua/spark that referenced this pull request Apr 9, 2020
…cutor in PythonRunner

Backport apache#24542 to 2.4.

existing tests

Closes apache#24552 from jiangxb1987/SPARK-25139-2.4.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 2f16255)
(cherry picked from commit d6cd2eb4bdac48c7d2f11faca11f53d901f6737b)
(cherry picked from commit 6d23c7a5f28cb988a3b3a820480453a6eafb0cf7)

Change-Id: I4401ec663789bcc63c17babe3fe90c3f7bb53364
(cherry picked from commit ac367b740b4e6cecf563146b189d8153106d208d)
(cherry picked from commit 3813df0ab9d48d6b1f988ac165f1cb0e00f2e4c0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants