Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
if (fs.exists(partitionerFilePath)) {
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// make sure that the file is closed if error occurrs during deserialization
val deserializeStream =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use finally in contexts like this.

Copy link
Contributor Author

@taoli91 taoli91 Apr 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen The deserializeStream will be closed. We only need this when the deserializeStream method throws exception and we have no chance to go to the line 258, in which the deserializeStream will be closed, with the inner fileInputStream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean put this in the block below that closes the deserialization stream, at best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean put this in the block below that closes the deserialization stream, at best.

@srowen It seems we need extra code to accommodate the scope problem if putting the close in other clauses. Probably it's cleaner to stick with this solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean -- it's less code. Just open the deserializeStream inside the block and close it in the finally block that follows. Actually, closing deserializeStream will already (should already) close the underlying stream anyway. It doesn't handle errors while making the stream from the original stream, but, constructors of the deserializer streams aren't reading the stream anyway. I suspect it's fine as-is, but it would be extra-defensive to also close the fileInputStream, yes.

Copy link
Contributor Author

@taoli91 taoli91 May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Probably a code snippet will explain things better. Do you mean this:

        var deserializeStream : DeserializationStream = null

        val partitioner = Utils.tryWithSafeFinally[Partitioner] {
          deserializeStream = serializer.deserializeStream(fileInputStream)
          deserializeStream.readObject[Partitioner]
        } {
          deserializeStream.close()
        }

There are a few things I am concerning

  1. The semantic has slightly changed (deserializeStream is now var instead of val)
  2. I'm not sure is it always safe to close an partially initialized deserializeStream, in the case of the deserialization throwing exception. I'm pretty sure that if we close the fileInputStream first may cause the closing deserializeSteam throwing exception complaining that the input has been closed already.

FYI, I found this problem by while running the test case "checkpointing partitioners" in which the corruptPartitionerFile flag is turned on in the CheckpointSuite.

Copy link
Member

@srowen srowen May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I mean. Well, if I were right that serializer.deserializeStream can't fail, then the existing code would already be fine, so this suggestion doesn't help unless fileInputStream gets a similar treatment, and that's too complex. I'm curious how that fails given the current implementation, but, even if it couldn't now, it could in the future.

You might normally resolve this with nested try blocks; I think streams aren't supposed to fail if closed twice, so, safe to close the underlying stream for good measure. Still at that point it's no less complex, so I can see that this is as clear as anything.

try {
serializer.deserializeStream(fileInputStream)
} catch {
case ex : Throwable =>
fileInputStream.close()
throw ex
}

val partitioner = Utils.tryWithSafeFinally[Partitioner] {
deserializeStream.readObject[Partitioner]
} {
Expand Down
13 changes: 9 additions & 4 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
nums.saveAsTextFile(outputDir)
// Read the plain text file and check it's OK
val outputFile = new File(outputDir, "part-00000")
val content = Source.fromFile(outputFile).mkString
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
val bufferSrc = Source.fromFile(outputFile)
try {
val content = bufferSrc.mkString
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
} finally {
bufferSrc.close()
}
}

test("text files (compressed)") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class RPackageUtilsSuite
assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
jars.foreach(_.close())
}
}

Expand Down Expand Up @@ -144,13 +145,15 @@ class RPackageUtilsSuite
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
assert(finalZip.exists())
val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
val zipFile = new ZipFile(finalZip)
val entries = zipFile.entries().asScala.map(_.getName).toSeq
assert(entries.contains("/test.R"))
assert(entries.contains("/SparkR/abc.R"))
assert(entries.contains("/SparkR/DESCRIPTION"))
assert(!entries.contains("/package.zip"))
assert(entries.contains("/packageTest/def.R"))
assert(entries.contains("/packageTest/DESCRIPTION"))
zipFile.close()
} finally {
FileUtils.deleteDirectory(tempDir)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
if (isNewFormat) {
EventLoggingListener.initEventLog(new FileOutputStream(file))
val newFormatStream = new FileOutputStream(file)
Utils.tryWithSafeFinally {
EventLoggingListener.initEventLog(newFormatStream)
} {
newFormatStream.close()
}
}

val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit

// Make sure expected events exist in the log file.
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
Expand All @@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerTaskStart,
SparkListenerTaskEnd,
SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
try {
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I think readLines is the only thing that reads logData? after it's done you could close the stream. Maybe that's tidier than wrapping so much in the try-finally block.

Copy link
Contributor Author

@taoli91 taoli91 May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I tried:

    val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
    var lines : Seq[String] = null
    try {
      lines = readLines(logData)
    } finally {
      logData.close()
    }

It thrown IOException complaining that Stream closed.

I think the readLines is sort of lazy such that it won't read the file until we actually move the iterator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK sounds fine. How about pulling out eventSet at least?

lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
}
}
}
}
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
} finally {
logData.close()
}
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}

private def readLines(in: InputStream): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local

// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
try {
// load the exception from the jar
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
loader.addURL(jarFile.toURI.toURL)
Thread.currentThread().setContextClassLoader(loader)
val excClass: Class[_] = Utils.classForName("repro.MyException")
Expand All @@ -210,6 +210,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
} finally {
loader.close()
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
val tempDir = Utils.createTempDir()
val outputDir = new File(tempDir, "output")
MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
val lines = outputDir.listFiles()
val sources = outputDir.listFiles()
.filter(_.getName.startsWith("part-"))
.flatMap(Source.fromFile(_).getLines())
.toSet
.map(Source.fromFile)

val lines = sources.flatMap(_.getLines()).toSet
val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
assert(lines === expected)
sources.foreach(_.close())
Utils.deleteRecursively(tempDir)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Finally, stop the endpoint
ssc.env.rpcEnv.stop(endpoint)
endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}

// note that the output writer is created at construction time, we have to close
// them even if it hasn't been started.
receivedBlockTracker.stop()
}

/** Allocate all unallocated blocks to the given batch. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,7 @@ public Integer call(String s) {
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
ssc.stop();
Utils.deleteRecursively(tempDir);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,16 +640,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val fileStream = ssc.textFileStream(testDir.toString)
// Make value 3 take a large time to process, to ensure that the driver
// shuts down in the middle of processing the 3rd batch
CheckpointSuite.batchThreeShouldBlockIndefinitely = true
val mappedStream = fileStream.map(s => {
CheckpointSuite.batchThreeShouldBlockALongTime = true
val mappedStream = fileStream.map{ s =>
val i = s.toInt
if (i == 3) {
while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
Thread.sleep(Long.MaxValue)
if (CheckpointSuite.batchThreeShouldBlockALongTime) {
// It's not a good idea to let the thread run forever
// as resource won't be correctly released
Thread.sleep(6000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the logic is that this needs to wait indefinitely, then this needs to be an error if it times out. It may require a different API call to accomplish this.

Copy link
Contributor Author

@taoli91 taoli91 Apr 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen It's a bit hacky here, but I can't find a good way. The problem here is that, if the Task here waits indefinitely, and the reader in the NewHadoopRDD will never get a chance to close. And the file handle held by it cannot be deleted by windows. And 6s is about the time needed to finish this case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, how about changing the text to "should block a long time" then? as long as this is more than long enough for the driver to shutdown it seems to be in line with the intent of the test.

}
}
i
})
}

// Reducing over a large window to ensure that recovery from driver failure
// requires reprocessing of all the files seen before the failure
Expand Down Expand Up @@ -689,7 +691,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

// The original StreamingContext has now been stopped.
CheckpointSuite.batchThreeShouldBlockIndefinitely = false
CheckpointSuite.batchThreeShouldBlockALongTime = false

// Create files while the streaming driver is down
for (i <- Seq(4, 5, 6)) {
Expand Down Expand Up @@ -926,5 +928,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
var batchThreeShouldBlockALongTime: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,16 @@ class MapWithStateSuite extends SparkFunSuite
protected val batchDuration = Seconds(1)

before {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
checkpointDir = Utils.createTempDir("checkpoint")
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
}

after {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
if (checkpointDir != null) {
Utils.deleteRecursively(checkpointDir)
}
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
}

override def beforeAll(): Unit = {
super.beforeAll()
checkpointDir = Utils.createTempDir("checkpoint")
val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
sc = new SparkContext(conf)
Expand All @@ -63,6 +60,7 @@ class MapWithStateSuite extends SparkFunSuite
}
} finally {
super.afterAll()
Utils.deleteRecursively(checkpointDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)

fileGeneratingThread.join()
ssc.stop()
fs.delete(checkpointDir, true)
fs.delete(testDir, true)
logInfo("Finished test after " + killCount + " failures")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ class ReceivedBlockTrackerSuite
val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
tracker1.stop()

incrementTime()

// Recovery without recovery from WAL and verify list of unallocated blocks is empty
val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
tracker1_.hasUnallocatedReceivedBlocks should be (false)
tracker1_.stop()

// Restart tracker and verify recovered list of unallocated blocks
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
Expand All @@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker2.stop()

// Verify whether log has correct contents
val expectedWrittenData2 = expectedWrittenData1 ++
Expand Down Expand Up @@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")
tracker3.stop()

// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
Expand All @@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker4.stop()
}

test("disable write ahead log when checkpoint directory is not set") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
writeAheadLog.close()
}

test(testPrefix + "handling file errors while reading rotating logs") {
Expand Down