Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class WriteAheadLog {
public abstract void clean(long threshTime, boolean waitForCompletion);

/**
* Close this log and release any resources.
* Close this log and release any resources. It must be idempotent.
*/
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false

/** Stop the receiver execution thread. */
def stop(graceful: Boolean): Unit = synchronized {
if (isTrackerStarted) {
// First, stop the receivers
trackerState = Stopping
val isStarted: Boolean = isTrackerStarted
trackerState = Stopping
if (isStarted) {
if (!skipReceiverLaunch) {
// Send the stop signal to all the receivers
// First, stop the receivers. Send the stop signal to all the receivers
endpoint.askSync[Boolean](StopAllReceivers)

// Wait for the Spark job that runs the receivers to be over
Expand All @@ -194,17 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Finally, stop the endpoint
ssc.env.rpcEnv.stop(endpoint)
endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
} else if (isTrackerInitialized) {
trackerState = Stopping
// `ReceivedBlockTracker` is open when this instance is created. We should
// close this even if this `ReceiverTracker` is not started.
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}

// `ReceivedBlockTracker` is open when this instance is created. We should
// close this even if this `ReceiverTracker` is not started.
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}

/** Allocate all unallocated blocks to the given batch. */
Expand Down Expand Up @@ -453,9 +449,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
endpoint.send(StartAllReceivers(receivers))
}

/** Check if tracker has been marked for initiated */
private def isTrackerInitialized: Boolean = trackerState == Initialized

/** Check if tracker has been marked for starting */
private def isTrackerStarted: Boolean = trackerState == Started

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.streaming.util

import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -60,7 +61,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
private val walWriteQueue = new LinkedBlockingQueue[Record]()

// Whether the writer thread is active
@volatile private var active: Boolean = true
private val active: AtomicBoolean = new AtomicBoolean(true)
private val buffer = new ArrayBuffer[Record]()

private val batchedWriterThread = startBatchedWriterThread()
Expand All @@ -72,7 +73,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = {
val promise = Promise[WriteAheadLogRecordHandle]()
val putSuccessfully = synchronized {
if (active) {
if (active.get()) {
walWriteQueue.offer(Record(byteBuffer, time, promise))
true
} else {
Expand Down Expand Up @@ -121,9 +122,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
*/
override def close(): Unit = {
logInfo(s"BatchedWriteAheadLog shutting down at time: ${System.currentTimeMillis()}.")
synchronized {
active = false
}
if (!active.getAndSet(false)) return
batchedWriterThread.interrupt()
batchedWriterThread.join()
while (!walWriteQueue.isEmpty) {
Expand All @@ -138,7 +137,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
private def startBatchedWriterThread(): Thread = {
val thread = new Thread(new Runnable {
override def run(): Unit = {
while (active) {
while (active.get()) {
try {
flushRecords()
} catch {
Expand Down Expand Up @@ -166,7 +165,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
}
try {
var segment: WriteAheadLogRecordHandle = null
if (buffer.length > 0) {
if (buffer.nonEmpty) {
logDebug(s"Batched ${buffer.length} records for Write Ahead Log write")
// threads may not be able to add items in order by time
val sortedByTime = buffer.sortBy(_.time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,12 @@ private[streaming] class FileBasedWriteAheadLog(

/** Stop the manager, close any open log writer */
def close(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter.close()
if (!executionContext.isShutdown) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be thread-safe? it seems like the impl just above tries to be. I don't know if it's needed.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen, maybe I missed your point. However, wouldn't there be a chance to try shutdown multiple times if isShutdown is not synchronised together? Probably, shutdown is idempotent itself but just wonder if it is okay to keep this as a safeguard.

if (currentLogWriter != null) {
currentLogWriter.close()
}
executionContext.shutdown()
}
executionContext.shutdown()
logInfo("Stopped write ahead log manager")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
} finally {
tracker.stop(false)
// Make sure it is idempotent.
tracker.stop(false)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ abstract class CommonWriteAheadLogTests(
}
}
writeAheadLog.close()
// Make sure it is idempotent.
writeAheadLog.close()
}

test(testPrefix + "handling file errors while reading rotating logs") {
Expand Down