-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7889] [CORE] HistoryServer to refresh cache of incomplete applications #6935
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7889] [CORE] HistoryServer to refresh cache of incomplete applications #6935
Conversation
|
Test build #35453 has finished for PR 6935 at commit
|
|
Test build #35454 has finished for PR 6935 at commit
|
|
|
|
style check. Not enough spaces, by the look of things |
|
@steveloughran, I think you always put too many space in some place. |
|
I should add that I did think about actually using the last-updated information to decide whether to refresh or not, and decided simply having a timeout was a good enough step #1 |
|
@steveloughran Have you tested it? I don't think it's ok. |
|
I probably tested different bits of it ... I verified that updated apps were coming in, but no, not a full functional test of the Web UI, as that would take a lot of preamble to set up those tests. I do have all the code to set up such tests in SPARK-1537, but was trying to keep thing separate... if we get that in then this would be easy to add as a new test case |
|
@steveloughran @XuTingjun sorry I finally took a closer look here. So I tried running a manual test on this patch and #6545, and it seems to me that neither one solves the problem. (Or perhaps I am misunderstanding the issue.) Here's the steps I took:
@XuTingjun it sounded like you had done some manual testing on #6545 and believe that it should work. Is there something wrong that I'm doing with my test? Unless I'm testing the wrong thing, I think this demonstrates the need to write a unit test which basically does the same thing, as its hard to verify the complete correctness here. I'm writing that test case now. |
|
If you want to do tests, grab the code the tests themselves aren't that complex, but the underlying runner bits to bring up web pages, run probes once they are up, etc, are a lot of foundational pain. |
|
sorry I didn't see that comment about the tests ... I wrote something up which mimics the existing From 2dddcd036a4cacf31d1ff5f15417fcdbcfdbc22e Mon Sep 17 00:00:00 2001
From: Imran Rashid <irashid@cloudera.com>
Date: Fri, 26 Jun 2015 14:16:16 -0500
Subject: [PATCH] failing test case
---
.../spark/deploy/history/HistoryServerSuite.scala | 74 +++++++++++++++++++++-
1 file changed, 72 insertions(+), 2 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index e5b5e1b..a288490 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -21,14 +21,24 @@ import java.net.{HttpURLConnection, URL}
import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.when
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest.concurrent.Eventually
+import org.scalatest.selenium.WebBrowser
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.util.Utils
+import org.apache.spark._
import org.apache.spark.ui.SparkUI
/**
@@ -43,7 +53,7 @@ import org.apache.spark.ui.SparkUI
* are considered part of Spark's public api.
*/
class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar
- with JsonTestUtils {
+ with JsonTestUtils with Eventually with WebBrowser {
private val logDir = new File("src/test/resources/spark-events")
private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
@@ -264,6 +274,66 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
justHrefs should contain(link)
}
+ test("incomplete apps get refreshed") {
+
+ implicit val webDriver: WebDriver = new HtmlUnitDriver
+ implicit val formats = org.json4s.DefaultFormats
+
+ val testDir = Utils.createTempDir()
+ testDir.deleteOnExit()
+
+ val myConf = new SparkConf()
+ .set("spark.history.fs.logDirectory", testDir.getAbsolutePath)
+ .set("spark.eventLog.dir", testDir.getAbsolutePath)
+ .set("spark.history.fs.update.interval", "1")
+ .set("spark.eventLog.enabled", "true")
+ .set("spark.history.cache.interval", "1")
+ .remove("spark.testing")
+ val provider = new FsHistoryProvider(myConf)
+ val securityManager = new SecurityManager(myConf)
+
+ val server = new HistoryServer(myConf, provider, securityManager, 18080)
+ val sc = new SparkContext("local", "test", myConf)
+ server.initialize()
+ server.bind()
+ val port = server.boundPort
+ try {
+ val d = sc.parallelize(1 to 10)
+ d.count()
+ val appId = eventually(timeout(20 seconds), interval(100 milliseconds)) {
+ val json = getContentAndCode("applications", port)._2.get
+ val apps = parse(json).asInstanceOf[JArray].arr
+ apps.size should be (1)
+ (apps(0) \ "id").extract[String]
+ }
+
+ def getNumJobs(suffix: String): Int = {
+ go to (s"http://localhost:$port/history/$appId$suffix")
+ findAll(cssSelector("tbody tr")).toIndexedSeq.size
+ }
+
+ getNumJobs("") should be (1)
+ d.count()
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ // this fails with patch https://github.com/apache/spark/pull/6935 as well, I'm not sure why
+ getNumJobs("") should be (2)
+ }
+
+ getNumJobs("/jobs") should be (2)
+ // Here is where we still have an error. /jobs is handled by another servlet,
+ // not controlled by the app cache. We need some way for each of those servlets
+ // to know that they can get expired, so they can be detached and then the reloaded
+ // one can get re-attached
+ d.count()
+ getNumJobs("/jobs") should be (3)
+
+ } finally {
+ sc.stop()
+ server.stop()
+ }
+
+ }
+
def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"))
}
--
2.2.2patch #6545 is closer to working (for some reason the cache does not seem to be getting refreshed in this PR, maybe my test is setup wrong). But I do think the cache with a refresh interval is probably a better way to go, rather than reloading the data on every single request. I have a rough understanding of what is going wrong when we make requests to Also I just realized, the same problem applies to the rest api at |
|
@squito, thank you very much, I am agree with what you said. |
|
@steveloughran do you think you can pick this up with the test case? I think we have a little better understanding of the problem. I do think your approach is in the right direction. |
|
yeah, I think this patch also can't refresh. It not detache the handlers. |
|
I like the selenium test; it could be combined with the provider i wrote which lets us programmatically create our own history, so add changes we can look for. |
|
(BTW, I'm not going to be looking at this for a couple of weeks; focusing on a (big) patch |
d96f29a to
b8deeee
Compare
|
Test build #46534 has finished for PR 6935 at commit
|
|
Test build #46535 has finished for PR 6935 at commit
|
|
Test build #46537 has finished for PR 6935 at commit
|
|
As discussed on #6545, I think we could actually switch to live UI playback, provided the number of cached UIs was kept down. What must be done here is keeping a limit on the number of threads doing playback; a pool will be needed equal to the size of the loaded UIs, or, if the provider starts them, the number of cached UIs must be limited and when removed from the cache, the thread has to be returned to the pool. This could not only handle incomplete applications, it should be able to offer a more responsive load of completed ones -the UI could be registered while event playback was in progress. Proposal
Some providers (maybe YARN ATS in some point in the future) can do incremental playback. Even the filesystem one could, if it cached the file offset on the previous load and re-opened on updates (HDFS makes no guarantees about when/whether changes to in-progress writes become visible to existing input streams, only that new streams will see the data.). These ones could be explicitly asked to refresh/update their entries with state kept in the cache. history providers which don't support incremental updates to in-progress applications would have to be updated by a complete reload |
|
Test build #46602 has finished for PR 6935 at commit
|
|
This is a converged patch which
There's nothing for spark UI tests (there's the template above there), or the REST API |
|
Test build #46635 has finished for PR 6935 at commit
|
… other details (monitoring.md wasn't up to date with the incomplete apps feature)
… from the loadedUI state and probes; makes for a leaner API
…rectly: no more cache and relay of state
|
...thx for the feedback. I'm fixing the merge which is now triggering a regression —maybe a race condition in test startup— apps should go from incomplete -> complete; that's one of the recurrent issues we have today. When the attempt is replayed, it's complete flag is changed and the fs history |
|
..I should add that it depends on the head attempt on the list being complete; the filter in HistoryServer is very sensitive to ordering. If there's an incomplete history older than a complete one, the app is considered incomplete |
…dentified: SPARK-10873's switch to dynamic tables means scans of static HTML fails
e0ad26d to
728b12c
Compare
|
rebased against master; switch from scan of HTML view to REST API to enumerate listings of complete/incomplete apps, add @squito's ? arg redirection and test |
|
Jenkins, test this please |
| * This includes registering metrics with [[metricRegistry]] | ||
| */ | ||
| private def init(): Unit = { | ||
| allMetrics.foreach(e => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this is a bit clearer with pattern matching: .foreach { case (name, metric) => ...
|
Test build #2522 has finished for PR 6935 at commit
|
| logDebug(s"Attempt ${prevInfo.name}/${prevInfo.appId} size => $size") | ||
| Some(new FsApplicationAttemptInfo(prevInfo.logPath, prevInfo.name, prevInfo.appId, | ||
| prevInfo.attemptId, prevInfo.startTime, prevInfo.endTime, prevInfo.lastUpdated, | ||
| prevInfo.sparkUser, prevInfo.completed, size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is part which got me thinking about the change the alternate version in #11118. I thought it seemed wrong that you were using prevInfo.completed -- if the app had transitioned to completed, woudl't you need to put the new status in? Well, it turns out you don't, because checkForLogs / mergeApplicationListings will already re-scan the existing, in-progress attempts, and update them if necessary (though there is a filesize vs. timestamp issue). And that is ultimately what transitions the app to complete status, since this can't do it.
And that led me to think that maybe we should just leverage that scan behavior rather than doing something more complicated.
|
ps can you close this one now? |
This patch pulls all the application cache logic out of the
HistoryServerand into its own class with callbacks for operations (get &c). This makes unit tests straightforward with a small bit of mocking of SparkUI.