Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Sep 16, 2019

What changes were proposed in this pull request?

PipedRDD will invoke stdinWriterThread.interrupt() at task completion, and obj.wait will get InterruptedException. However, there exists a possibility which the thread termination gets delayed because the thread starts from obj.wait() with that exception. To prevent test flakiness, we need to use eventually. Also, This PR fixes the typo in code comment and variable name.

Why are the changes needed?

- stdin writer thread should be exited when task is finished *** FAILED ***
  Some(Thread[stdin writer for List(cat),5,]) was not empty (PipedRDDSuite.scala:107)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manual.

We can reproduce the same failure like Jenkins if we catch InterruptedException and sleep longer than the eventually timeout inside the test code. The following is the example to reproduce it.

val nums = sc.makeRDD(Array(1, 2, 3, 4), 1).map { x =>
  try {
    obj.synchronized {
      obj.wait() // make the thread waits here.
    }
  } catch {
    case ie: InterruptedException =>
      Thread.sleep(15000)
      throw ie
  }
  x
}

val result = piped.mapPartitions(_ => Array.emptyIntArray.iterator)

assert(result.collect().length === 0)
sc.listenerBus.waitUntilEmpty(10000)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix and the remaining is typo fix from err to in.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@HeartSaVioR
Copy link
Contributor

Could you elaborate how it helps to resolve flakiness? I'm seeing that TaskContext.addTaskCompletionListener doesn't seem to add listeners to the listener bus - it handles these listeners by its own, so wondering which event we want to ensure being handled in listenerBus before verifying.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110672 has finished for PR 25808 at commit 25e9f99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 17, 2019

@HeartSaVioR . What do you mean by the following?

I'm seeing that TaskContext.addTaskCompletionListener doesn't seem to add listeners to the listener bus

Listener is added like the following.

1. PipedRDD

    context.addTaskCompletionListener[Unit] { _ =>
      if (proc.isAlive) {
        proc.destroy()
      }

      if (stdinWriterThread.isAlive) {
        stdinWriterThread.interrupt()
      }
      if (stderrReaderThread.isAlive) {
        stderrReaderThread.interrupt()
      }
    }

2. TaskContext

  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
    // Note that due to this scala bug: https://github.com/scala/bug/issues/11016, we need to make
    // this function polymorphic for every scala version >= 2.12, otherwise an overloaded method
    // resolution error occurs at compile time.
    addTaskCompletionListener(new TaskCompletionListener {
      override def onTaskCompletion(context: TaskContext): Unit = f(context)
    })
  }

3. TaskContextImpl

  override def addTaskCompletionListener(listener: TaskCompletionListener)
      : this.type = synchronized {
    if (completed) {
      listener.onTaskCompletion(this)
    } else {
      onCompleteCallbacks += listener
    }
    this
  }

And, that will be invoked like the following.

  private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
    if (completed) return
    completed = true
    invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
      _.onTaskCompletion(this)
    }
  }

@dongjoon-hyun
Copy link
Member Author

Thank you for review and approval, @HyukjinKwon .

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 17, 2019

I meant the flow doesn't seem to deal with listenerBus in SparkContext. These listeners are registered to TaskContext instead of listenerBus, and Task will directly update the status to listeners.

try {
runTask(context)
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
try {
context.markTaskFailed(e)
} catch {
case t: Throwable =>
e.addSuppressed(t)
}
context.markTaskCompleted(Some(e))
throw e
} finally {
try {
// Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
// one is no-op.
context.markTaskCompleted(None)
} finally {
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the
// future.
val memoryManager = SparkEnv.get.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
// Though we unset the ThreadLocal here, the context member variable itself is still
// queried directly in the TaskRunner to check for FetchFailedExceptions.
TaskContext.unset()
InputFileBlockHolder.unset()
}
}
}
}

Here both TaskContext.markTaskFailed and TaskContext.markTaskCompleted is called from Task.run. So once Task.run is guaranteed to be finished before verifying, listener in PipedRDD should be executed as well.

@dongjoon-hyun
Copy link
Member Author

Oh, let me take a look at that once more.

@dongjoon-hyun
Copy link
Member Author

Got it. I was confused during following the code. After context.markTaskCompleted(Some(e)), it's directly called as you mentioned. I'll try to find another way. Thanks!

@HeartSaVioR
Copy link
Contributor

Yeah, actually I also don't get the reason why such thread is shown afterwards, as collect() would ensure task is finished if my understanding is correct.

@dongjoon-hyun
Copy link
Member Author

It seems that it will take some time. I'll close this PR first to avoid further confusion. Thank you for feedbacks, @HyukjinKwon and @HeartSaVioR !

@dongjoon-hyun
Copy link
Member Author

It seems that there exists another possibility. I'll test and reopen this PR.

@dongjoon-hyun dongjoon-hyun reopened this Sep 17, 2019
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29104][CORE][TESTS] Fix PipedRDDSuite to wait messages before checking thread termination [SPARK-29104][CORE][TESTS] Fix PipedRDDSuite to use eventually to check thread termination Sep 17, 2019
@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 17, 2019

Hi, @HyukjinKwon and @HeartSaVioR . This is the second try to fix the PipedRDDSuite flakiness. Could you review this again, please?


assert(stderrWriterThread.isEmpty)
// SPARK-29104 PipedRDD will invoke `stdinWriterThread.interrupt()` at task completion,
// and `obj.wait` will get InterruptedException. However, there exists a possibility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated patch. Much better with artificially reproducing the issue.

I'm trying to understand the situation - for the sake of understanding, interrupting thread would throw InterruptedException when thread is waiting via obj.wait() and stdinWriterThread will catch it, and run() will finish.

I'm trying to understand the problematic situation as well - if Thread.interrupt() occurs before obj.wait(), the thread will keep running until there's some method understanding Thread's interrupted state (like Object.wait() which will throw InterruptedException even it was thrown earlier) so there's possible delay for thread to be finished. Could you confirm my understanding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second flow is correct. However, the first one is slightly differently described. stdinWriterThread is not catching the exception. The task thread is just marked as interrupted status and restarts from the after obj.wait bytecode with InterruptedException. However, this Task thread can be slow and the test code grabs the Thread.getAllStackTraces before the Task thread escaped from the run function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played with debugger for the first case as second case cannot be reproduced easily but first case would be always reproduced. It's slightly different though no big deal.

With debugger, for normal case with current master branch, stdinWriterThread catches the InterruptedException and escape run() function after executing catch/finally statements. stderrReaderThread doesn't catch the InterruptedException as it's already destroyed when we try to interrupt.

We don't join these threads for task completion anyway so it seems to be also possible even for first case to let test code verify earlier than let stdinWriterThread be destroyed. Maybe you've already explained same and I misunderstood ;( If that's the case, I'm sorry to bother you.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110737 has finished for PR 25808 at commit 918cbd7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventually

@HyukjinKwon
Copy link
Member

retest this please

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110747 has finished for PR 25808 at commit 918cbd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventually

@HyukjinKwon
Copy link
Member

Merged to master

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 17, 2019

Thank you so much for helping me in this PR, @HyukjinKwon and @HeartSaVioR !
I'll backport this to branch-2.4, too. I noticed that this test is only at master.

dongjoon-hyun added a commit that referenced this pull request Sep 18, 2019
…heck thread termination

### What changes were proposed in this pull request?

`PipedRDD` will invoke `stdinWriterThread.interrupt()` at task completion, and `obj.wait` will get `InterruptedException`. However, there exists a possibility which the thread termination gets delayed because the thread starts from `obj.wait()` with that exception. To prevent test flakiness, we need to use `eventually`. Also, This PR fixes the typo in code comment and variable name.

### Why are the changes needed?

```
- stdin writer thread should be exited when task is finished *** FAILED ***
  Some(Thread[stdin writer for List(cat),5,]) was not empty (PipedRDDSuite.scala:107)
```

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6867/testReport/junit/org.apache.spark.rdd/PipedRDDSuite/stdin_writer_thread_should_be_exited_when_task_is_finished/

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manual.

We can reproduce the same failure like Jenkins if we catch `InterruptedException` and sleep longer than the `eventually` timeout inside the test code. The following is the example to reproduce it.
```scala
val nums = sc.makeRDD(Array(1, 2, 3, 4), 1).map { x =>
  try {
    obj.synchronized {
      obj.wait() // make the thread waits here.
    }
  } catch {
    case ie: InterruptedException =>
      Thread.sleep(15000)
      throw ie
  }
  x
}
```

Closes #25808 from dongjoon-hyun/SPARK-29104.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 34915b2)
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member Author

This is backported to branch-2.4, too.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-29104 branch September 18, 2019 16:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants