Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import java.nio.file.Files
import java.security.SecureRandom
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.zip.GZIPInputStream

Expand Down Expand Up @@ -433,7 +434,7 @@ private[spark] object Utils extends Logging {
new URI("file:///" + rawFileName).getPath.substring(1)
}

/**
/**
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
Expand Down Expand Up @@ -506,6 +507,14 @@ private[spark] object Utils extends Logging {
targetFile
}

/** Records the duration of running `body`. */
def timeTakenMs[T](body: => T): (T, Long) = {
val startTime = System.nanoTime()
val result = body
val endTime = System.nanoTime()
(result, math.max(NANOSECONDS.toMillis(endTime - startTime), 0))
}

/**
* Download `in` to `tempFile`, then move it to `destFile`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.nio.channels.ClosedChannelException
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
import scala.util.control.NonFatal

import com.google.common.io.ByteStreams
Expand Down Expand Up @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
if (loadedCurrentVersionMap.isDefined) {
return loadedCurrentVersionMap.get
}
val snapshotCurrentVersionMap = readSnapshotFile(version)
if (snapshotCurrentVersionMap.isDefined) {
synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
return snapshotCurrentVersionMap.get
}

// Find the most recent map before this version that we can.
// [SPARK-22305] This must be done iteratively to avoid stack overflow.
var lastAvailableVersion = version
var lastAvailableMap: Option[MapType] = None
while (lastAvailableMap.isEmpty) {
lastAvailableVersion -= 1
logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR this is normal operation and not yet understand why use logWarning then. Can we lower this to debug or was there a reason to use warning? It's kinda overkill in unit tests...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a normal if it's not just restored from checkpoint. If someone encounters the warning message while batches are running, it should be considered seriously because full state is being loaded from HDFS now though we expect state cache should contain it.

"Reading snapshot file and delta files if needed..." +
"Note that this is normal for the first batch of starting query.")

if (lastAvailableVersion <= 0) {
// Use an empty map for versions 0 or less.
lastAvailableMap = Some(new MapType)
} else {
lastAvailableMap =
synchronized { loadedMaps.get(lastAvailableVersion) }
.orElse(readSnapshotFile(lastAvailableVersion))
val (result, elapsedMs) = Utils.timeTakenMs {
Copy link
Contributor

@jose-torres jose-torres Jun 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github has an... interesting idea of how to display this diff. The only change was the existing code moving inside timeTakenMs and adding the logWarning statements, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup right. Most of the code change is just wrapping codes into timeTakenMs.

val snapshotCurrentVersionMap = readSnapshotFile(version)
if (snapshotCurrentVersionMap.isDefined) {
synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
return snapshotCurrentVersionMap.get
}

// Find the most recent map before this version that we can.
// [SPARK-22305] This must be done iteratively to avoid stack overflow.
var lastAvailableVersion = version
var lastAvailableMap: Option[MapType] = None
while (lastAvailableMap.isEmpty) {
lastAvailableVersion -= 1

if (lastAvailableVersion <= 0) {
// Use an empty map for versions 0 or less.
lastAvailableMap = Some(new MapType)
} else {
lastAvailableMap =
synchronized { loadedMaps.get(lastAvailableVersion) }
.orElse(readSnapshotFile(lastAvailableVersion))
}
}

// Load all the deltas from the version after the last available one up to the target version.
// The last available version is the one with a full snapshot, so it doesn't need deltas.
val resultMap = new MapType(lastAvailableMap.get)
for (deltaVersion <- lastAvailableVersion + 1 to version) {
updateFromDeltaFile(deltaVersion, resultMap)
}
}

// Load all the deltas from the version after the last available one up to the target version.
// The last available version is the one with a full snapshot, so it doesn't need deltas.
val resultMap = new MapType(lastAvailableMap.get)
for (deltaVersion <- lastAvailableVersion + 1 to version) {
updateFromDeltaFile(deltaVersion, resultMap)
synchronized { loadedMaps.put(version, resultMap) }
resultMap
}

synchronized { loadedMaps.put(version, resultMap) }
resultMap
logDebug(s"Loading state for $version takes $elapsedMs ms.")

result
}

private def writeUpdateToDeltaFile(
Expand Down Expand Up @@ -490,15 +499,18 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
/** Perform a snapshot of the store to allow delta files to be consolidated */
private def doSnapshot(): Unit = {
try {
val files = fetchFiles()
val (files, e1) = Utils.timeTakenMs(fetchFiles())
logDebug(s"fetchFiles() took $e1 ms.")

if (files.nonEmpty) {
val lastVersion = files.last.version
val deltaFilesForLastVersion =
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
synchronized { loadedMaps.get(lastVersion) } match {
case Some(map) =>
if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
writeSnapshotFile(lastVersion, map)
val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, map))
logDebug(s"writeSnapshotFile() took $e2 ms.")
}
case None =>
// The last map is not loaded, probably some other instance is in charge
Expand All @@ -517,7 +529,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
*/
private[state] def cleanup(): Unit = {
try {
val files = fetchFiles()
val (files, e1) = Utils.timeTakenMs(fetchFiles())
logDebug(s"fetchFiles() took $e1 ms.")

if (files.nonEmpty) {
val earliestVersionToRetain = files.last.version - storeConf.minVersionsToRetain
if (earliestVersionToRetain > 0) {
Expand All @@ -527,9 +541,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
mapsToRemove.foreach(loadedMaps.remove)
}
val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
filesToDelete.foreach { f =>
fm.delete(f.path)
val (_, e2) = Utils.timeTakenMs {
filesToDelete.foreach { f =>
fm.delete(f.path)
}
}
logDebug(s"deleting files took $e2 ms.")
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
filesToDelete.mkString(", "))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress}
import org.apache.spark.sql.types._
import org.apache.spark.util.{CompletionIterator, NextIterator}
import org.apache.spark.util.{CompletionIterator, NextIterator, Utils}


/** Used to identify the state store for a given operator. */
Expand Down Expand Up @@ -97,12 +97,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
}

/** Records the duration of running `body` for the next query progress update. */
protected def timeTakenMs(body: => Unit): Long = {
val startTime = System.nanoTime()
val result = body
val endTime = System.nanoTime()
math.max(NANOSECONDS.toMillis(endTime - startTime), 0)
}
protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2

/**
* Set the SQL metrics related to the state store.
Expand Down