Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
deserializeStream.readObject[Partitioner]
val partitioner = Utils.tryWithSafeFinally {
val deserializeStream = serializer.deserializeStream(fileInputStream)
Utils.tryWithSafeFinally {
deserializeStream.readObject[Partitioner]
} {
deserializeStream.close()
}
} {
deserializeStream.close()
fileInputStream.close()
}

logDebug(s"Read partitioner from $partitionerFilePath")
Some(partitioner)
} catch {
Expand Down
13 changes: 9 additions & 4 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
nums.saveAsTextFile(outputDir)
// Read the plain text file and check it's OK
val outputFile = new File(outputDir, "part-00000")
val content = Source.fromFile(outputFile).mkString
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
val bufferSrc = Source.fromFile(outputFile)
Utils.tryWithSafeFinally {
val content = bufferSrc.mkString
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
} {
bufferSrc.close()
}
}

test("text files (compressed)") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.util.{ResetSystemProperties, Utils}

class RPackageUtilsSuite
extends SparkFunSuite
Expand Down Expand Up @@ -74,9 +74,13 @@ class RPackageUtilsSuite
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
Utils.tryWithSafeFinally {
assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
} {
jars.foreach(_.close())
}
}
}

Expand Down Expand Up @@ -131,7 +135,7 @@ class RPackageUtilsSuite

test("SparkR zipping works properly") {
val tempDir = Files.createTempDir()
try {
Utils.tryWithSafeFinally {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
assert(fakeSparkRDir.mkdirs())
Expand All @@ -144,14 +148,19 @@ class RPackageUtilsSuite
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
assert(finalZip.exists())
val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
assert(entries.contains("/test.R"))
assert(entries.contains("/SparkR/abc.R"))
assert(entries.contains("/SparkR/DESCRIPTION"))
assert(!entries.contains("/package.zip"))
assert(entries.contains("/packageTest/def.R"))
assert(entries.contains("/packageTest/DESCRIPTION"))
} finally {
val zipFile = new ZipFile(finalZip)
Utils.tryWithSafeFinally {
val entries = zipFile.entries().asScala.map(_.getName).toSeq
assert(entries.contains("/test.R"))
assert(entries.contains("/SparkR/abc.R"))
assert(entries.contains("/SparkR/DESCRIPTION"))
assert(!entries.contains("/package.zip"))
assert(entries.contains("/packageTest/def.R"))
assert(entries.contains("/packageTest/DESCRIPTION"))
} {
zipFile.close()
}
} {
FileUtils.deleteDirectory(tempDir)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
if (isNewFormat) {
EventLoggingListener.initEventLog(new FileOutputStream(file))
val newFormatStream = new FileOutputStream(file)
Utils.tryWithSafeFinally {
EventLoggingListener.initEventLog(newFormatStream)
} {
newFormatStream.close()
}
}

val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit

// Make sure expected events exist in the log file.
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
Expand All @@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerTaskStart,
SparkListenerTaskEnd,
SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
Utils.tryWithSafeFinally {
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
}
}
}
}
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
} {
logData.close()
}
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}

private def readLines(in: InputStream): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local

// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
try {
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
Utils.tryWithSafeFinally {
// load the exception from the jar
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
loader.addURL(jarFile.toURI.toURL)
Thread.currentThread().setContextClassLoader(loader)
val excClass: Class[_] = Utils.classForName("repro.MyException")
Expand All @@ -209,8 +209,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local

assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
} finally {
} {
Thread.currentThread.setContextClassLoader(originalClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good idea to swap these two lines - so that classloader is always reset even if loader.close() fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense.

loader.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,17 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
val tempDir = Utils.createTempDir()
val outputDir = new File(tempDir, "output")
MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
val lines = outputDir.listFiles()
val sources = outputDir.listFiles()
.filter(_.getName.startsWith("part-"))
.flatMap(Source.fromFile(_).getLines())
.toSet
val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
assert(lines === expected)
Utils.deleteRecursively(tempDir)
.map(Source.fromFile)
Utils.tryWithSafeFinally {
val lines = sources.flatMap(_.getLines()).toSet
val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
assert(lines === expected)
} {
sources.foreach(_.close())
Utils.deleteRecursively(tempDir)
}
}

test("appendBias") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,7 @@ public Integer call(String s) {
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
ssc.stop();
Utils.deleteRecursively(tempDir);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val fileStream = ssc.textFileStream(testDir.toString)
// Make value 3 take a large time to process, to ensure that the driver
// shuts down in the middle of processing the 3rd batch
CheckpointSuite.batchThreeShouldBlockIndefinitely = true
val mappedStream = fileStream.map(s => {
CheckpointSuite.batchThreeShouldBlockALongTime = true
val mappedStream = fileStream.map { s =>
val i = s.toInt
if (i == 3) {
while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
Thread.sleep(Long.MaxValue)
if (CheckpointSuite.batchThreeShouldBlockALongTime) {
// It's not a good idea to let the thread run forever
// as resource won't be correctly released
Thread.sleep(6000)
}
}
i
})
}

// Reducing over a large window to ensure that recovery from driver failure
// requires reprocessing of all the files seen before the failure
Expand Down Expand Up @@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

// The original StreamingContext has now been stopped.
CheckpointSuite.batchThreeShouldBlockIndefinitely = false
CheckpointSuite.batchThreeShouldBlockALongTime = false

// Create files while the streaming driver is down
for (i <- Seq(4, 5, 6)) {
Expand Down Expand Up @@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
var batchThreeShouldBlockALongTime: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)

fileGeneratingThread.join()
ssc.stop()
fs.delete(checkpointDir, true)
fs.delete(testDir, true)
logInfo("Finished test after " + killCount + " failures")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ class ReceivedBlockTrackerSuite
val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
tracker1.stop()

incrementTime()

// Recovery without recovery from WAL and verify list of unallocated blocks is empty
val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
tracker1_.hasUnallocatedReceivedBlocks should be (false)
tracker1_.stop()

// Restart tracker and verify recovered list of unallocated blocks
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
Expand All @@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker2.stop()

// Verify whether log has correct contents
val expectedWrittenData2 = expectedWrittenData1 ++
Expand Down Expand Up @@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")
tracker3.stop()

// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
Expand All @@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker4.stop()
}

test("disable write ahead log when checkpoint directory is not set") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
writeAheadLog.close()
}

test(testPrefix + "handling file errors while reading rotating logs") {
Expand Down