Skip to content

Commit ac2c6cd

Browse files
zhli1142015HeartSaVioR
authored andcommitted
[SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing
### What changes were proposed in this pull request? Update ApplicationStoreInfo.size to real size during HistoryServerDiskManager initializing. ### Why are the changes needed? This PR is for fixing bug [32024](https://issues.apache.org/jira/browse/SPARK-32024). We found after history server restart, below error would randomly happen: "java.lang.IllegalStateException: Disk usage tracker went negative (now = -***, delta = -***)" from `HistoryServerDiskManager`. ![Capture](https://user-images.githubusercontent.com/10524738/85034468-fda4ae80-b136-11ea-9011-f0c3e6508002.JPG) **Cause**: Reading data from level db would trigger table file compaction, which may also trigger size of level db directory changes. This size change may not be recorded in LevelDB (`ApplicationStoreInfo` in `listing`). When service restarts, `currentUsage` is calculated from real directory size, but `ApplicationStoreInfo` are loaded from leveldb, then `currentUsage` may be less then sum of `ApplicationStoreInfo.size`. In `makeRoom()` function, `ApplicationStoreInfo.size` is used to update usage. Then `currentUsage` becomes negative after several round of `release()` and `lease()` (`makeRoom()`). **Reproduce**: we can reproduce this issue in dev environment by reducing config value of "spark.history.retainedApplications" and "spark.history.store.maxDiskUsage" to some small values. Here are steps: 1. start history server, load some applications and access some pages (maybe "stages" page to trigger leveldb compaction). 2. restart HS, and refresh pages. I also added an UT to simulate this case in `HistoryServerDiskManagerSuite`. **Benefit**: this change would help improve history server reliability. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test and manually tested it. Closes #28859 from zhli1142015/update-ApplicationStoreInfo.size-during-disk-manager-initialize. Authored-by: Zhen Li <[email protected]> Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]> (cherry picked from commit 8e7fc04) Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
1 parent 0939730 commit ac2c6cd

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,29 @@ private class HistoryServerDiskManager(
7575

7676
// Go through the recorded store directories and remove any that may have been removed by
7777
// external code.
78-
val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
79-
!new File(info.path).exists()
80-
}.toSeq
78+
val (existences, orphans) = listing
79+
.view(classOf[ApplicationStoreInfo])
80+
.asScala
81+
.toSeq
82+
.partition { info =>
83+
new File(info.path).exists()
84+
}
8185

8286
orphans.foreach { info =>
8387
listing.delete(info.getClass(), info.path)
8488
}
8589

90+
// Reading level db would trigger table file compaction, then it may cause size of level db
91+
// directory changed. When service restarts, "currentUsage" is calculated from real directory
92+
// size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
93+
// sum of "ApplicationStoreInfo.size".
94+
existences.foreach { info =>
95+
val fileSize = sizeOf(new File(info.path))
96+
if (fileSize != info.size) {
97+
listing.write(info.copy(size = fileSize))
98+
}
99+
}
100+
86101
logInfo("Initialized disk manager: " +
87102
s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
88103
s"max usage = ${Utils.bytesToString(maxUsage)}")

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
158158
assert(manager.approximateSize(50L, true) > 50L)
159159
}
160160

161+
test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
162+
val manager = mockManager()
163+
val leaseA = manager.lease(2)
164+
doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath))
165+
val dstA = leaseA.commit("app1", None)
166+
assert(manager.free() === 0)
167+
assert(manager.committed() === 3)
168+
// Listing store tracks dstA now.
169+
assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 3)
170+
171+
// Simulate: service restarts, new disk manager (manager1) is initialized.
172+
val manager1 = mockManager()
173+
// Simulate: event KVstore compaction before restart, directory size reduces.
174+
doReturn(2L).when(manager1).sizeOf(meq(dstA))
175+
doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps")))
176+
manager1.initialize()
177+
// "ApplicationStoreInfo.size" is updated for dstA.
178+
assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 2)
179+
assert(manager1.free() === 1)
180+
// If "ApplicationStoreInfo.size" is not correctly updated, "IllegalStateException"
181+
// would be thrown.
182+
val leaseB = manager1.lease(2)
183+
assert(manager1.free() === 1)
184+
doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath))
185+
val dstB = leaseB.commit("app2", None)
186+
assert(manager1.committed() === 2)
187+
// Listing store tracks dstB only, dstA is evicted by "makeRoom()".
188+
assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 2)
189+
190+
val manager2 = mockManager()
191+
// Simulate: cache entities are written after replaying, directory size increases.
192+
doReturn(3L).when(manager2).sizeOf(meq(dstB))
193+
doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
194+
manager2.initialize()
195+
// "ApplicationStoreInfo.size" is updated for dstB.
196+
assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 3)
197+
assert(manager2.free() === 0)
198+
val leaseC = manager2.lease(2)
199+
doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath))
200+
val dstC = leaseC.commit("app3", None)
201+
assert(manager2.free() === 1)
202+
assert(manager2.committed() === 2)
203+
// Listing store tracks dstC only, dstB is evicted by "makeRoom()".
204+
assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2)
205+
}
206+
161207
}

0 commit comments

Comments
 (0)