Skip to content

Commit d84f98f

Browse files
committed
[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider
1 parent e76b012 commit d84f98f

File tree

3 files changed

+62
-41
lines changed

3 files changed

+62
-41
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import java.nio.file.Files
3030
import java.security.SecureRandom
3131
import java.util.{Locale, Properties, Random, UUID}
3232
import java.util.concurrent._
33+
import java.util.concurrent.TimeUnit.NANOSECONDS
3334
import java.util.concurrent.atomic.AtomicBoolean
3435
import java.util.zip.GZIPInputStream
3536

@@ -433,7 +434,7 @@ private[spark] object Utils extends Logging {
433434
new URI("file:///" + rawFileName).getPath.substring(1)
434435
}
435436

436-
/**
437+
/**
437438
* Download a file or directory to target directory. Supports fetching the file in a variety of
438439
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
439440
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
@@ -506,6 +507,14 @@ private[spark] object Utils extends Logging {
506507
targetFile
507508
}
508509

510+
/** Records the duration of running `body`. */
511+
def timeTakenMs[T](body: => T): (T, Long) = {
512+
val startTime = System.nanoTime()
513+
val result = body
514+
val endTime = System.nanoTime()
515+
(result, math.max(NANOSECONDS.toMillis(endTime - startTime), 0))
516+
}
517+
509518
/**
510519
* Download `in` to `tempFile`, then move it to `destFile`.
511520
*

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io._
21-
import java.nio.channels.ClosedChannelException
2221
import java.util.Locale
2322

2423
import scala.collection.JavaConverters._
2524
import scala.collection.mutable
26-
import scala.util.Random
2725
import scala.util.control.NonFatal
2826

2927
import com.google.common.io.ByteStreams
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
280278
if (loadedCurrentVersionMap.isDefined) {
281279
return loadedCurrentVersionMap.get
282280
}
283-
val snapshotCurrentVersionMap = readSnapshotFile(version)
284-
if (snapshotCurrentVersionMap.isDefined) {
285-
synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
286-
return snapshotCurrentVersionMap.get
287-
}
288281

289-
// Find the most recent map before this version that we can.
290-
// [SPARK-22305] This must be done iteratively to avoid stack overflow.
291-
var lastAvailableVersion = version
292-
var lastAvailableMap: Option[MapType] = None
293-
while (lastAvailableMap.isEmpty) {
294-
lastAvailableVersion -= 1
282+
logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
283+
"Reading snapshot file and delta files if needed..." +
284+
"Note that this is normal for the first batch of starting query.")
295285

296-
if (lastAvailableVersion <= 0) {
297-
// Use an empty map for versions 0 or less.
298-
lastAvailableMap = Some(new MapType)
299-
} else {
300-
lastAvailableMap =
301-
synchronized { loadedMaps.get(lastAvailableVersion) }
302-
.orElse(readSnapshotFile(lastAvailableVersion))
286+
val (result, elapsedMs) = Utils.timeTakenMs {
287+
val snapshotCurrentVersionMap = readSnapshotFile(version)
288+
if (snapshotCurrentVersionMap.isDefined) {
289+
synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
290+
return snapshotCurrentVersionMap.get
291+
}
292+
293+
// Find the most recent map before this version that we can.
294+
// [SPARK-22305] This must be done iteratively to avoid stack overflow.
295+
var lastAvailableVersion = version
296+
var lastAvailableMap: Option[MapType] = None
297+
while (lastAvailableMap.isEmpty) {
298+
lastAvailableVersion -= 1
299+
300+
if (lastAvailableVersion <= 0) {
301+
// Use an empty map for versions 0 or less.
302+
lastAvailableMap = Some(new MapType)
303+
} else {
304+
lastAvailableMap =
305+
synchronized { loadedMaps.get(lastAvailableVersion) }
306+
.orElse(readSnapshotFile(lastAvailableVersion))
307+
}
308+
}
309+
310+
// Load all the deltas from the version after the last available one up to the target version.
311+
// The last available version is the one with a full snapshot, so it doesn't need deltas.
312+
val resultMap = new MapType(lastAvailableMap.get)
313+
for (deltaVersion <- lastAvailableVersion + 1 to version) {
314+
updateFromDeltaFile(deltaVersion, resultMap)
303315
}
304-
}
305316

306-
// Load all the deltas from the version after the last available one up to the target version.
307-
// The last available version is the one with a full snapshot, so it doesn't need deltas.
308-
val resultMap = new MapType(lastAvailableMap.get)
309-
for (deltaVersion <- lastAvailableVersion + 1 to version) {
310-
updateFromDeltaFile(deltaVersion, resultMap)
317+
synchronized { loadedMaps.put(version, resultMap) }
318+
resultMap
311319
}
312320

313-
synchronized { loadedMaps.put(version, resultMap) }
314-
resultMap
321+
logWarning(s"Loading state for $version takes $elapsedMs ms.")
322+
323+
result
315324
}
316325

317326
private def writeUpdateToDeltaFile(
@@ -490,15 +499,18 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
490499
/** Perform a snapshot of the store to allow delta files to be consolidated */
491500
private def doSnapshot(): Unit = {
492501
try {
493-
val files = fetchFiles()
502+
val (files, e1) = Utils.timeTakenMs(fetchFiles())
503+
logDebug(s"fetchFiles() took $e1 ms.")
504+
494505
if (files.nonEmpty) {
495506
val lastVersion = files.last.version
496507
val deltaFilesForLastVersion =
497508
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
498509
synchronized { loadedMaps.get(lastVersion) } match {
499510
case Some(map) =>
500511
if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
501-
writeSnapshotFile(lastVersion, map)
512+
val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, map))
513+
logDebug(s"writeSnapshotFile() took $e2 ms.")
502514
}
503515
case None =>
504516
// The last map is not loaded, probably some other instance is in charge
@@ -517,7 +529,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
517529
*/
518530
private[state] def cleanup(): Unit = {
519531
try {
520-
val files = fetchFiles()
532+
val (files, e1) = Utils.timeTakenMs(fetchFiles())
533+
logDebug(s"fetchFiles() took $e1 ms.")
534+
521535
if (files.nonEmpty) {
522536
val earliestVersionToRetain = files.last.version - storeConf.minVersionsToRetain
523537
if (earliestVersionToRetain > 0) {
@@ -527,9 +541,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
527541
mapsToRemove.foreach(loadedMaps.remove)
528542
}
529543
val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
530-
filesToDelete.foreach { f =>
531-
fm.delete(f.path)
544+
val (_, e2) = Utils.timeTakenMs {
545+
filesToDelete.foreach { f =>
546+
fm.delete(f.path)
547+
}
532548
}
549+
logDebug(s"deleting files took $e2 ms.")
533550
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
534551
filesToDelete.mkString(", "))
535552
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
3535
import org.apache.spark.sql.execution.streaming.state._
3636
import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress}
3737
import org.apache.spark.sql.types._
38-
import org.apache.spark.util.{CompletionIterator, NextIterator}
38+
import org.apache.spark.util.{CompletionIterator, NextIterator, Utils}
3939

4040

4141
/** Used to identify the state store for a given operator. */
@@ -97,12 +97,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
9797
}
9898

9999
/** Records the duration of running `body` for the next query progress update. */
100-
protected def timeTakenMs(body: => Unit): Long = {
101-
val startTime = System.nanoTime()
102-
val result = body
103-
val endTime = System.nanoTime()
104-
math.max(NANOSECONDS.toMillis(endTime - startTime), 0)
105-
}
100+
protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
106101

107102
/**
108103
* Set the SQL metrics related to the state store.

0 commit comments

Comments
 (0)