Skip to content

Commit e71c45c

Browse files
authored
Merge branch 'master' into cnf-for-partition-pruning
2 parents 2f576fa + 8750363 commit e71c45c

File tree

130 files changed

+3240
-1746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+3240
-1746
lines changed

common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import java.io.File;
2121
import java.io.IOException;
22+
import java.lang.ref.SoftReference;
2223
import java.util.*;
2324
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ConcurrentLinkedQueue;
2426
import java.util.concurrent.ConcurrentMap;
2527
import java.util.concurrent.atomic.AtomicReference;
2628
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
6466
private final ConcurrentMap<String, byte[]> typeAliases;
6567
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
6668

69+
/**
70+
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
71+
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
72+
* to ensure that the iterator can be GCed, when it is only referenced here.
73+
*/
74+
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;
75+
6776
public LevelDB(File path) throws Exception {
6877
this(path, new KVStoreSerializer());
6978
}
@@ -94,6 +103,8 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
94103
aliases = new HashMap<>();
95104
}
96105
typeAliases = new ConcurrentHashMap<>(aliases);
106+
107+
iteratorTracker = new ConcurrentLinkedQueue<>();
97108
}
98109

99110
@Override
@@ -189,7 +200,9 @@ public <T> KVStoreView<T> view(Class<T> type) throws Exception {
189200
@Override
190201
public Iterator<T> iterator() {
191202
try {
192-
return new LevelDBIterator<>(type, LevelDB.this, this);
203+
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
204+
iteratorTracker.add(new SoftReference<>(it));
205+
return it;
193206
} catch (Exception e) {
194207
throw Throwables.propagate(e);
195208
}
@@ -238,6 +251,14 @@ public void close() throws IOException {
238251
}
239252

240253
try {
254+
if (iteratorTracker != null) {
255+
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
256+
LevelDBIterator<?> it = ref.get();
257+
if (it != null) {
258+
it.close();
259+
}
260+
}
261+
}
241262
_db.close();
242263
} catch (IOException ioe) {
243264
throw ioe;
@@ -252,6 +273,7 @@ public void close() throws IOException {
252273
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
253274
*/
254275
void closeIterator(LevelDBIterator<?> it) throws IOException {
276+
notifyIteratorClosed(it);
255277
synchronized (this._db) {
256278
DB _db = this._db.get();
257279
if (_db != null) {
@@ -260,6 +282,14 @@ void closeIterator(LevelDBIterator<?> it) throws IOException {
260282
}
261283
}
262284

285+
/**
286+
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
287+
* iterator is closed.
288+
*/
289+
void notifyIteratorClosed(LevelDBIterator<?> it) {
290+
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
291+
}
292+
263293
/** Returns metadata about indices for the given type. */
264294
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
265295
LevelDBTypeInfo ti = types.get(type);

common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public boolean skip(long n) {
185185

186186
@Override
187187
public synchronized void close() throws IOException {
188+
db.notifyIteratorClosed(this);
188189
if (!closed) {
189190
it.close();
190191
closed = true;

common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.util.Arrays;
22+
import java.util.Iterator;
2223
import java.util.List;
2324
import java.util.NoSuchElementException;
2425
import java.util.stream.Collectors;
@@ -276,6 +277,41 @@ public void testNegativeIndexValues() throws Exception {
276277
assertEquals(expected, results);
277278
}
278279

280+
@Test
281+
public void testCloseLevelDBIterator() throws Exception {
282+
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
283+
// are closed. And files opened by iterators are also closed.
284+
File dbPathForCloseTest = File
285+
.createTempFile(
286+
"test_db_close.",
287+
".ldb");
288+
dbPathForCloseTest.delete();
289+
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
290+
for (int i = 0; i < 8192; i++) {
291+
dbForCloseTest.write(createCustomType1(i));
292+
}
293+
String key = dbForCloseTest
294+
.view(CustomType1.class).iterator().next().key;
295+
assertEquals("key0", key);
296+
Iterator<CustomType1> it0 = dbForCloseTest
297+
.view(CustomType1.class).max(1).iterator();
298+
while (it0.hasNext()) {
299+
it0.next();
300+
}
301+
System.gc();
302+
Iterator<CustomType1> it1 = dbForCloseTest
303+
.view(CustomType1.class).iterator();
304+
assertEquals("key0", it1.next().key);
305+
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
306+
.view(CustomType1.class).closeableIterator()) {
307+
assertEquals("key0", it2.next().key);
308+
}
309+
dbForCloseTest.close();
310+
assertTrue(dbPathForCloseTest.exists());
311+
FileUtils.deleteQuietly(dbPathForCloseTest);
312+
assertTrue(!dbPathForCloseTest.exists());
313+
}
314+
279315
private CustomType1 createCustomType1(int i) {
280316
CustomType1 t = new CustomType1();
281317
t.key = "key" + i;

core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.css

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.css.map

Lines changed: 0 additions & 1 deletion
This file was deleted.

core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js

Lines changed: 20 additions & 50 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map

Lines changed: 0 additions & 1 deletion
This file was deleted.

core/src/main/resources/org/apache/spark/ui/static/webui.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ $(function() {
8787
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
8888
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
8989
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
90-
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
90+
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
91+
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
9192
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
9293
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
9394
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ private[spark] class ExecutorAllocationManager(
518518
if (testing) {
519519
throw new SparkException("ResourceProfile Id was UNKNOWN, this is not expected")
520520
}
521-
logWarning(s"Not removing executor $executorIdsToBeRemoved because the " +
521+
logWarning(s"Not removing executor $executorIdToBeRemoved because the " +
522522
"ResourceProfile was UNKNOWN!")
523523
} else {
524524
// get the running total as we remove or initialize it to the count - pendingRemoval

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,7 @@ private[spark] class Executor(
323323
val threadName = s"Executor task launch worker for task $taskId"
324324
val taskName = taskDescription.name
325325
val mdcProperties = taskDescription.properties.asScala
326-
.filter(_._1.startsWith("mdc.")).map { item =>
327-
val key = item._1.substring(4)
328-
(key, item._2)
329-
}.toSeq
326+
.filter(_._1.startsWith("mdc.")).toSeq
330327

331328
/** If specified, this task has been killed and this option contains the reason. */
332329
@volatile private var reasonIfKilled: Option[String] = None
@@ -705,7 +702,7 @@ private[spark] class Executor(
705702
MDC.clear()
706703
mdc.foreach { case (key, value) => MDC.put(key, value) }
707704
// avoid overriding the takName by the user
708-
MDC.put("taskName", taskName)
705+
MDC.put("mdc.taskName", taskName)
709706
}
710707

711708
/**

0 commit comments

Comments
 (0)