From df7b339d73097b8501fe0937f770b8b2ded1b63e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 10 Apr 2018 21:21:14 -0700 Subject: [PATCH 1/9] CheckpointFileManager --- .../apache/spark/sql/internal/SQLConf.scala | 7 + .../streaming/CheckpointFileManager.scala | 374 ++++++++++++++++++ .../execution/streaming/HDFSMetadataLog.scala | 213 +--------- .../state/HDFSBackedStateStoreProvider.scala | 124 +++--- .../streaming/state/StateStore.scala | 4 +- .../CheckpointFileManagerSuite.scala | 192 +++++++++ .../streaming/HDFSMetadataLogSuite.scala | 103 +---- .../streaming/state/StateStoreSuite.scala | 56 ++- 8 files changed, 697 insertions(+), 376 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11864bd1b1847..db87c57689307 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -913,6 +913,13 @@ object SQLConf { .intConf .createWithDefault(100) + val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS = + buildConf("spark.sql.streaming.checkpointFileManagerClass") + .doc("The class used to write checkpoint files atomically. This class must be a subclass " + + "of the interface CheckpointFileManager.") + .internal() + .stringConf + val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala new file mode 100644 index 0000000000000..e5338dff4c57e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileSystem => _, _} +import java.util.{EnumSet, UUID} + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * An interface to abstract out all operation related to streaming checkpoints. Most importantly, + * the key operation this interface provides is `createAtomic(path, overwrite)` which returns a + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations + * to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or + * without overwrite. + * + * This higher-level interface above the Hadoop FileSystem is necessary because + * different implementation of FileSystem/FileContext may have different combination of operations + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename, + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations while + * keeping the usage simple (`createAtomic` -> `close` or `cancel`). + */ +trait CheckpointFileManager { + + import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + + /** List the files in a path that match a filter. */ + def list(path: Path, filter: PathFilter): Array[FileStatus] + + /** List all the files in a path. */ + def list(path: Path): Array[FileStatus] = { + list(path, new PathFilter { override def accept(path: Path): Boolean = true }) + } + + /** Make directory at the give path and all its parent directories as needed. */ + def mkdirs(path: Path): Unit + + /** Whether path exists */ + def exists(path: Path): Boolean + + /** Create a file. */ + def create(path: Path, overwrite: Boolean): FSDataOutputStream + + /** Create a file and make its contents available atomically after the output stream is closed. */ + def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + + /** Rename a file. */ + def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit + + /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ + def delete(path: Path): Unit + + /** Copy a local file to a remote file. */ + def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit + + /** Copy a remote file to the local file. */ + def copyToLocalFile(srcPath: Path, localDestFile: File): Unit + + /** Is the default file system this implementation is operating on the local file system. */ + def isLocal: Boolean +} + +object CheckpointFileManager extends Logging { + + /** + * An interface to add the cancel() operation to [[FSDataOutputStream]]. This is used + * mainly by `CheckpointFileManager.createAtomic` to write a file atomically. + * + * @see [[CheckpointFileManager]]. + */ + abstract class CancellableFSDataOutputStream(protected val underlyingStream: OutputStream) + extends FSDataOutputStream(underlyingStream, null) { + /** Cancel the `underlyingStream` and ensure that the output file is not generated. */ + def cancel(): Unit + } + + /** + * An implementation of [[CancellableFSDataOutputStream]] that writes a file atomically by writing + * to a temporary file and then renames. + */ + sealed class RenameBasedFSDataOutputStream( + fm: CheckpointFileManager, + finalPath: Path, + tempPath: Path, + overwrite: Boolean) + extends CancellableFSDataOutputStream(fm.create(tempPath, overwrite)) { + + def this(fm: CheckpointFileManager, path: Path, overwrite: Boolean) = { + this(fm, path, generateTempPath(path), overwrite) + } + + logInfo(s"Writing atomically to $finalPath using temp file $tempPath") + @volatile private var terminated = false + + override def close(): Unit = synchronized { + try { + if (terminated) return + super.close() + fm.rename(tempPath, finalPath, overwrite) + logInfo(s"Renamed temp file $tempPath to $finalPath") + } finally { + terminated = true + } + } + + override def cancel(): Unit = synchronized { + try { + if (terminated) return + underlyingStream.close() + fm.delete(tempPath) + } catch { + case NonFatal(e) => + logWarning(s"Error cancelling write to $finalPath", e) + } finally { + terminated = true + } + } + } + + + /** Create an instance of [[CheckpointFileManager]] based on the path and configuration. */ + def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = { + val fileManagerClass = hadoopConf.get( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key) + if (fileManagerClass != null) { + return Utils.classForName(fileManagerClass) + .getConstructor(classOf[Path], classOf[Configuration]) + .newInstance(path, hadoopConf) + .asInstanceOf[CheckpointFileManager] + } + try { + // Try to create a manager based on `FileContext` because HDFS's `FileContext.rename() + // gives atomic renames, which is what we rely on for the default implementation + // `CheckpointFileManager.createAtomic`. + new FileContextBasedCheckpointFileManager(path, hadoopConf) + } catch { + case e: UnsupportedFileSystemException => + logWarning( + "Could not use FileContext API for managing metadata log files at path " + + s"$path. Using FileSystem API instead for managing log files. The log may be " + + s"inconsistent under failures.") + new FileSystemBasedCheckpointFileManager(path, hadoopConf) + } + new FileSystemBasedCheckpointFileManager(path, hadoopConf) + } + + private def generateTempPath(path: Path): Path = { + val tc = org.apache.spark.TaskContext.get + val tid = if (tc != null) ".TID" + tc.taskAttemptId else "" + new Path(path.getParent, s".${path.getName}.${UUID.randomUUID}${tid}.tmp") + } +} + + +/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileSystem]] API. */ +class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends CheckpointFileManager with Logging { + + import CheckpointFileManager._ + + protected val fs = path.getFileSystem(hadoopConf) + + fs.setVerifyChecksum(false) + fs.setWriteChecksum(false) + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = { + fs.listStatus(path, filter) + } + + override def mkdirs(path: Path): Unit = { + fs.mkdirs(path, FsPermission.getDirDefault) + } + + override def create(path: Path, overwrite: Boolean): FSDataOutputStream = { + fs.create(path, overwrite) + } + + override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwrite) + } + + override def open(path: Path): FSDataInputStream = { + fs.open(path) + } + + override def exists(path: Path): Boolean = { + fs.exists(path) + } + + override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = { + if (!overwrite && fs.exists(dstPath)) { + throw new FileAlreadyExistsException( + s"Failed to rename $srcPath to $dstPath as destination already exists") + } + + def deleteAndRename(prevException: Exception): Unit = { + if (overwrite) { + try { + if (fs.delete(dstPath, true)) { + logWarning(s"Failed to delete $dstPath before second attempt to rename") + } + if (!fs.rename(srcPath, dstPath)) { + val msg = s"Failed to rename temp file $srcPath to $dstPath as second attempt to" + + s"rename (after delete) returned false" + logWarning(msg) + val e = new IOException(msg) + e.addSuppressed(prevException) + throw e + } + } catch { + case NonFatal(e) => + logError(s"Failed to write atomically to $dstPath", e) + if (prevException != null) e.addSuppressed(prevException) + throw e + } + } else { + throw prevException + } + } + + try { + if (!fs.rename(srcPath, dstPath)) { + val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" + logWarning(msg) + deleteAndRename(new IOException(msg)) + } + } catch { + case fe: FileAlreadyExistsException => + logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe) + deleteAndRename(fe) + } + } + + override def delete(path: Path): Unit = { + try { + fs.delete(path, true) + } catch { + case e: FileNotFoundException => + logInfo(s"Failed to delete $path as it does not exist") + // ignore if file has already been deleted + } + } + + override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = { + fs.copyFromLocalFile(new Path(localSrcFile.getAbsoluteFile.toURI), destPath) + } + + override def copyToLocalFile(srcPath: Path, localDestFile: File): Unit = { + fs.copyToLocalFile(srcPath, new Path(localDestFile.getAbsoluteFile.toURI)) + } + + override def isLocal: Boolean = fs match { + case _: LocalFileSystem | _: RawLocalFileSystem => true + case _ => false + } +} + + +/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */ +class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends CheckpointFileManager with Logging { + + import CheckpointFileManager._ + + private val fc = if (path.toUri.getScheme == null) { + FileContext.getFileContext(hadoopConf) + } else { + FileContext.getFileContext(path.toUri, hadoopConf) + } + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = { + fc.util.listStatus(path, filter) + } + + override def mkdirs(path: Path): Unit = { + fc.mkdir(path, FsPermission.getDirDefault, true) + } + + override def create(path: Path, overwrite: Boolean): FSDataOutputStream = { + import CreateFlag._ + val flags = if (overwrite) EnumSet.of(CREATE, OVERWRITE) else EnumSet.of(CREATE) + fc.create(path, flags) + } + + override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwrite) + } + + override def open(path: Path): FSDataInputStream = { + fc.open(path) + } + + override def exists(path: Path): Boolean = { + fc.util.exists(path) + } + + override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = { + import Options.Rename._ + fc.rename(srcPath, dstPath, if (overwrite) OVERWRITE else NONE) + } + + + override def delete(path: Path): Unit = { + try { + fc.delete(path, true) + } catch { + case e: FileNotFoundException => + // ignore if file has already been deleted + } + } + + override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = { + val localFc = FileContext.getLocalFSFileContext + var in: InputStream = null + var out: OutputStream = null + try { + in = localFc.open(new Path(localSrcFile.getAbsoluteFile.toURI)) + out = fc.create(destPath, EnumSet.of(CreateFlag.CREATE)) + IOUtils.copyLarge(in, out) + } finally { + if (in != null) in.close() + if (out != null) out.close() + } + } + + override def copyToLocalFile(srcPath: Path, localDstFile: File): Unit = { + val localFc = FileContext.getLocalFSFileContext + var in: InputStream = null + var out: OutputStream = null + try { + in = fc.open(srcPath) + out = localFc.create( + new Path(localDstFile.getAbsoluteFile.toURI), EnumSet.of(CreateFlag.CREATE)) + IOUtils.copyLarge(in, out) + } finally { + if (in != null) in.close() + if (out != null) out.close() + } + } + + override def isLocal: Boolean = fc.getDefaultFileSystem match { + case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs + case _ => false + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 00bc215a5dc8c..faae8cbbccd5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -57,10 +57,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]], "Should not create a log with type Seq, use Arrays instead - see SPARK-17372") - import HDFSMetadataLog._ - val metadataPath = new Path(path) - protected val fileManager = createFileManager() + + protected val fileManager = + CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf) if (!fileManager.exists(metadataPath)) { fileManager.mkdirs(metadataPath) @@ -109,68 +109,31 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: require(metadata != null, "'null' metadata cannot written to a metadata log") get(batchId).map(_ => false).getOrElse { // Only write metadata when the batch has not yet been written - writeBatch(batchId, metadata) + writeBatchToFile(metadata, batchIdToPath(batchId)) true } } - private def writeTempBatch(metadata: T): Option[Path] = { - while (true) { - val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") - try { - val output = fileManager.create(tempPath) - try { - serialize(metadata, output) - return Some(tempPath) - } finally { - output.close() - } - } catch { - case e: FileAlreadyExistsException => - // Failed to create "tempPath". There are two cases: - // 1. Someone is creating "tempPath" too. - // 2. This is a restart. "tempPath" has already been created but not moved to the final - // batch file (not committed). - // - // For both cases, the batch has not yet been committed. So we can retry it. - // - // Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use - // the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a - // big problem because it requires the attacker must have the permission to write the - // metadata path. In addition, the old Streaming also have this issue, people can create - // malicious checkpoint files to crash a Streaming application too. - } - } - None - } - - /** - * Write a batch to a temp file then rename it to the batch file. + /** Write a batch to a temp file then rename it to the batch file. * * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a * valid behavior, we still need to prevent it from destroying the files. */ - private def writeBatch(batchId: Long, metadata: T): Unit = { - val tempPath = writeTempBatch(metadata).getOrElse( - throw new IllegalStateException(s"Unable to create temp batch file $batchId")) + private def writeBatchToFile(metadata: T, path: Path): Unit = { + val output = fileManager.createAtomic(path, overwrite = false) try { - // Try to commit the batch - // It will fail if there is an existing file (someone has committed the batch) - logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") - fileManager.rename(tempPath, batchIdToPath(batchId)) - - // SPARK-17475: HDFSMetadataLog should not leak CRC files - // If the underlying filesystem didn't rename the CRC file, delete it. - val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") - if (fileManager.exists(crcPath)) fileManager.delete(crcPath) + serialize(metadata, output) + output.close() } catch { case e: FileAlreadyExistsException => - // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch. - // So throw an exception to tell the user this is not a valid behavior. + output.cancel() + // If next batch file already exists, then another concurrently running query has + // written it. throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) - } finally { - fileManager.delete(tempPath) + s"Multiple streaming queries are concurrently using $path", e) + case e: Throwable => + output.cancel() + throw e } } @@ -219,7 +182,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) }.sorted - verifyBatchIds(batchIds, startId, endId) + HDFSMetadataLog.verifyBatchIds(batchIds, startId, endId) batchIds.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { case (batchId, metadataOption) => @@ -280,19 +243,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - private def createFileManager(): FileManager = { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - try { - new FileContextManager(metadataPath, hadoopConf) - } catch { - case e: UnsupportedFileSystemException => - logWarning("Could not use FileContext API for managing metadata log files at path " + - s"$metadataPath. Using FileSystem API instead for managing log files. The log may be " + - s"inconsistent under failures.") - new FileSystemManager(metadataPath, hadoopConf) - } - } - /** * Parse the log version from the given `text` -- will throw exception when the parsed version * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1", @@ -327,135 +277,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: object HDFSMetadataLog { - /** A simple trait to abstract out the file management operations needed by HDFSMetadataLog. */ - trait FileManager { - - /** List the files in a path that match a filter. */ - def list(path: Path, filter: PathFilter): Array[FileStatus] - - /** Make directory at the give path and all its parent directories as needed. */ - def mkdirs(path: Path): Unit - - /** Whether path exists */ - def exists(path: Path): Boolean - - /** Open a file for reading, or throw exception if it does not exist. */ - def open(path: Path): FSDataInputStream - - /** Create path, or throw exception if it already exists */ - def create(path: Path): FSDataOutputStream - - /** - * Atomically rename path, or throw exception if it cannot be done. - * Should throw FileNotFoundException if srcPath does not exist. - * Should throw FileAlreadyExistsException if destPath already exists. - */ - def rename(srcPath: Path, destPath: Path): Unit - - /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ - def delete(path: Path): Unit - } - - /** - * Default implementation of FileManager using newer FileContext API. - */ - class FileContextManager(path: Path, hadoopConf: Configuration) extends FileManager { - private val fc = if (path.toUri.getScheme == null) { - FileContext.getFileContext(hadoopConf) - } else { - FileContext.getFileContext(path.toUri, hadoopConf) - } - - override def list(path: Path, filter: PathFilter): Array[FileStatus] = { - fc.util.listStatus(path, filter) - } - - override def rename(srcPath: Path, destPath: Path): Unit = { - fc.rename(srcPath, destPath) - } - - override def mkdirs(path: Path): Unit = { - fc.mkdir(path, FsPermission.getDirDefault, true) - } - - override def open(path: Path): FSDataInputStream = { - fc.open(path) - } - - override def create(path: Path): FSDataOutputStream = { - fc.create(path, EnumSet.of(CreateFlag.CREATE)) - } - - override def exists(path: Path): Boolean = { - fc.util().exists(path) - } - - override def delete(path: Path): Unit = { - try { - fc.delete(path, true) - } catch { - case e: FileNotFoundException => - // ignore if file has already been deleted - } - } - } - - /** - * Implementation of FileManager using older FileSystem API. Note that this implementation - * cannot provide atomic renaming of paths, hence can lead to consistency issues. This - * should be used only as a backup option, when FileContextManager cannot be used. - */ - class FileSystemManager(path: Path, hadoopConf: Configuration) extends FileManager { - private val fs = path.getFileSystem(hadoopConf) - - override def list(path: Path, filter: PathFilter): Array[FileStatus] = { - fs.listStatus(path, filter) - } - - /** - * Rename a path. Note that this implementation is not atomic. - * @throws FileNotFoundException if source path does not exist. - * @throws FileAlreadyExistsException if destination path already exists. - * @throws IOException if renaming fails for some unknown reason. - */ - override def rename(srcPath: Path, destPath: Path): Unit = { - if (!fs.exists(srcPath)) { - throw new FileNotFoundException(s"Source path does not exist: $srcPath") - } - if (fs.exists(destPath)) { - throw new FileAlreadyExistsException(s"Destination path already exists: $destPath") - } - if (!fs.rename(srcPath, destPath)) { - throw new IOException(s"Failed to rename $srcPath to $destPath") - } - } - - override def mkdirs(path: Path): Unit = { - fs.mkdirs(path, FsPermission.getDirDefault) - } - - override def open(path: Path): FSDataInputStream = { - fs.open(path) - } - - override def create(path: Path): FSDataOutputStream = { - fs.create(path, false) - } - - override def exists(path: Path): Boolean = { - fs.exists(path) - } - - override def delete(path: Path): Unit = { - try { - fs.delete(path, true) - } catch { - case e: FileNotFoundException => - // ignore if file has already been deleted - } - } - } - /** * Verify if batchIds are continuous and between `startId` and `endId`. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 3f5002a4e6937..63158fe4f8bfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} +import java.io._ import java.nio.channels.ClosedChannelException import java.util.Locale @@ -27,13 +27,16 @@ import scala.util.Random import scala.util.control.NonFatal import com.google.common.io.ByteStreams +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.LZ4CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream import org.apache.spark.sql.types.StructType import org.apache.spark.util.{SizeEstimator, Utils} @@ -87,10 +90,10 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit case object ABORTED extends STATE private val newVersion = version + 1 - private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}") - private lazy val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true)) @volatile private var state: STATE = UPDATING - @volatile private var finalDeltaFile: Path = null + private val finalDeltaFile: Path = deltaFile(newVersion) + private lazy val deltaFileStream = fm.createAtomic(finalDeltaFile, overwrite = true) + private lazy val compressedStream = compressStream(deltaFileStream) override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId @@ -103,14 +106,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val keyCopy = key.copy() val valueCopy = value.copy() mapToUpdate.put(keyCopy, valueCopy) - writeUpdateToDeltaFile(tempDeltaFileStream, keyCopy, valueCopy) + writeUpdateToDeltaFile(compressedStream, keyCopy, valueCopy) } override def remove(key: UnsafeRow): Unit = { verify(state == UPDATING, "Cannot remove after already committed or aborted") val prevValue = mapToUpdate.remove(key) if (prevValue != null) { - writeRemoveToDeltaFile(tempDeltaFileStream, key) + writeRemoveToDeltaFile(compressedStream, key) } } @@ -126,8 +129,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit verify(state == UPDATING, "Cannot commit after already committed or aborted") try { - finalizeDeltaFile(tempDeltaFileStream) - finalDeltaFile = commitUpdates(newVersion, mapToUpdate, tempDeltaFile) + finalizeDeltaFile(compressedStream) + loadedMaps.put(newVersion, mapToUpdate) state = COMMITTED logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") newVersion @@ -140,23 +143,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { - verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") - try { + // This if statement is to ensure that files are deleted only if there are changes to the + // StateStore. We have two StateStores for each task, one which is used only for reading, and + // the other used for read+write. We don't want the read-only to delete state files. + if (state == UPDATING) { + state = ABORTED + cancelIfPossible(compressedStream, deltaFileStream) + } else { state = ABORTED - if (tempDeltaFileStream != null) { - tempDeltaFileStream.close() - } - if (tempDeltaFile != null) { - fs.delete(tempDeltaFile, true) - } - } catch { - case c: ClosedChannelException => - // This can happen when underlying file output stream has been closed before the - // compression stream. - logDebug(s"Error aborting version $newVersion into $this", c) - - case e: Exception => - logWarning(s"Error aborting version $newVersion into $this", e) } logInfo(s"Aborted version $newVersion for $this") } @@ -212,7 +206,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit this.valueSchema = valueSchema this.storeConf = storeConf this.hadoopConf = hadoopConf - fs.mkdirs(baseDir) + fm.mkdirs(baseDir) } override def stateStoreId: StateStoreId = stateStoreId_ @@ -251,34 +245,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private lazy val loadedMaps = new mutable.HashMap[Long, MapType] private lazy val baseDir = stateStoreId.storeCheckpointLocation() - private lazy val fs = baseDir.getFileSystem(hadoopConf) + private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean) - /** Commit a set of updates to the store with the given new version */ - private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { - synchronized { - val finalDeltaFile = deltaFile(newVersion) - - // scalastyle:off - // Renaming a file atop an existing one fails on HDFS - // (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html). - // Hence we should either skip the rename step or delete the target file. Because deleting the - // target file will break speculation, skipping the rename step is the only choice. It's still - // semantically correct because Structured Streaming requires rerunning a batch should - // generate the same output. (SPARK-19677) - // scalastyle:on - if (fs.exists(finalDeltaFile)) { - fs.delete(tempDeltaFile, true) - } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { - throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") - } - loadedMaps.put(newVersion, map) - finalDeltaFile - } - } - /** * Get iterator of all the data of the latest version of the store. * Note that this will look up the files to determined the latest known version. @@ -365,7 +336,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val fileToRead = deltaFile(version) var input: DataInputStream = null val sourceStream = try { - fs.open(fileToRead) + fm.open(fileToRead) } catch { case f: FileNotFoundException => throw new IllegalStateException( @@ -412,12 +383,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } private def writeSnapshotFile(version: Long, map: MapType): Unit = { - val fileToWrite = snapshotFile(version) - val tempFile = - new Path(fileToWrite.getParent, s"${fileToWrite.getName}.temp-${Random.nextLong}") + val targetFile = snapshotFile(version) + var rawOutput: CancellableFSDataOutputStream = null var output: DataOutputStream = null - Utils.tryWithSafeFinally { - output = compressStream(fs.create(tempFile, false)) + try { + rawOutput = fm.createAtomic(targetFile, overwrite = true) + output = compressStream(rawOutput) val iter = map.entrySet().iterator() while(iter.hasNext) { val entry = iter.next() @@ -429,16 +400,34 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit output.write(valueBytes) } output.writeInt(-1) - } { - if (output != null) output.close() + output.close() + } catch { + case e: Throwable => + cancelIfPossible(compressedStream = output, rawStream = rawOutput) + throw e } - if (fs.exists(fileToWrite)) { - // Skip rename if the file is alreayd created. - fs.delete(tempFile, true) - } else if (!fs.rename(tempFile, fileToWrite)) { - throw new IOException(s"Failed to rename $tempFile to $fileToWrite") + logInfo(s"Written snapshot file for version $version of $this at $targetFile") + } + + /** + * Try to cancel the underlying stream and safely close the compressed stream. + * + * @param compressedStream the compressed stream. + * @param rawStream the underlying stream which needs to be cancelled. + */ + private def cancelIfPossible( + compressedStream: DataOutputStream, + rawStream: CancellableFSDataOutputStream): Unit = { + try { + if (rawStream != null) rawStream.cancel() + IOUtils.closeQuietly(compressedStream) + } catch { + case e: FSError if e.getCause.isInstanceOf[IOException] => + // Closing the compressedStream causes the stream to write/flush flush data into the + // rawStream. Since the rawStream is already closed, there may be errors. + // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // IOException into FSError. } - logInfo(s"Written snapshot file for version $version of $this at $fileToWrite") } private def readSnapshotFile(version: Long): Option[MapType] = { @@ -447,7 +436,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit var input: DataInputStream = null try { - input = decompressStream(fs.open(fileToRead)) + input = decompressStream(fm.open(fileToRead)) var eof = false while (!eof) { @@ -508,7 +497,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit case None => // The last map is not loaded, probably some other instance is in charge } - } } catch { case NonFatal(e) => @@ -534,7 +522,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } val filesToDelete = files.filter(_.version < earliestFileToRetain.version) filesToDelete.foreach { f => - fs.delete(f.path, true) + fm.delete(f.path) } logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " + filesToDelete.mkString(", ")) @@ -576,7 +564,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Fetch all the files that back the store */ private def fetchFiles(): Seq[StoreFile] = { val files: Seq[FileStatus] = try { - fs.listStatus(baseDir) + fm.list(baseDir) } catch { case _: java.io.FileNotFoundException => Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index d1d9f95cb0977..7eb68c21569ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -459,7 +459,6 @@ object StateStore extends Logging { private def coordinatorRef: Option[StateStoreCoordinatorRef] = loadedProviders.synchronized { val env = SparkEnv.get if (env != null) { - logInfo("Env is not null") val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER || env.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER @@ -467,13 +466,12 @@ object StateStore extends Logging { // as SparkContext + SparkEnv may have been restarted. Hence, when running in driver, // always recreate the reference. if (isDriver || _coordRef == null) { - logInfo("Getting StateStoreCoordinatorRef") + logDebug("Getting StateStoreCoordinatorRef") _coordRef = StateStoreCoordinatorRef.forExecutor(env) } logInfo(s"Retrieved reference to StateStoreCoordinator: ${_coordRef}") Some(_coordRef) } else { - logInfo("Env is null") _coordRef = null None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala new file mode 100644 index 0000000000000..d02006979ffc9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io._ +import java.net.URI + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + +abstract class CheckpointFileManagerTests extends SparkFunSuite { + + def createManager(path: Path): CheckpointFileManager + + test("mkdirs, list, create, rename, delete") { + withTempPath { p => + val basePath = new Path(p.getAbsolutePath) + val fm = createManager(basePath) + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + assert(!fm.exists(dir)) + fm.mkdirs(dir) + assert(fm.exists(dir)) + fm.mkdirs(dir) + + // List + val acceptAllFilter = new PathFilter { + override def accept(path: Path): Boolean = true + } + val rejectAllFilter = new PathFilter { + override def accept(path: Path): Boolean = false + } + assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir")) + assert(fm.list(basePath, rejectAllFilter).length === 0) + + // Create + val path = new Path(s"$dir/file") + assert(!fm.exists(path)) + fm.create(path, overwrite = false).close() + assert(fm.exists(path)) + intercept[IOException] { + fm.create(path, overwrite = false) + } + fm.create(path, overwrite = true).close() + + // Open and delete + fm.open(path).close() + fm.delete(path) + assert(!fm.exists(path)) + intercept[IOException] { + fm.open(path) + } + fm.delete(path) // should not throw exception + + // Rename + val path1 = new Path(s"$dir/file1") + val path2 = new Path(s"$dir/file2") + fm.create(path1, overwrite = true).close() + assert(fm.exists(path1)) + fm.rename(path1, path2, overwrite = false) + + val path3 = new Path(s"$dir/file3") + fm.create(path3, overwrite = true).close() + assert(fm.exists(path3)) + intercept[FileAlreadyExistsException] { + fm.rename(path2, path3, overwrite = false) + } + fm.rename(path2, path3, overwrite = true) + } + } + + protected def withTempPath(f: File => Unit): Unit = { + val path = Utils.createTempDir() + path.delete() + try f(path) finally Utils.deleteRecursively(path) + } +} + +class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { + + test("CheckpointFileManager.create() should pick up user-specified class from conf") { + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> + classOf[TestCheckpointFileManager].getName) { + val fileManager = + CheckpointFileManager.create(new Path("/"), spark.sessionState.newHadoopConf) + assert(fileManager.isInstanceOf[TestCheckpointFileManager]) + } + } + + test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") { + import FakeFileSystem.scheme + spark.conf.set( + s"fs.$scheme.impl", + classOf[FakeFileSystem].getName) + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.getLatest() === Some(0 -> "batch0")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) + + + val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") + assert(metadataLog2.get(0) === Some("batch0")) + assert(metadataLog2.getLatest() === Some(0 -> "batch0")) + assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) + + } + } +} + +class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests { + override def createManager(path: Path): CheckpointFileManager = { + new FileContextBasedCheckpointFileManager(path, new Configuration()) + } +} + +class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests { + override def createManager(path: Path): CheckpointFileManager = { + new FileSystemBasedCheckpointFileManager(path, new Configuration()) + } +} + + +/** A fake implementation to test different characteristics of CheckpointFileManager interface */ +class TestCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { + + import CheckpointFileManager._ + + override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = { + if (TestCheckpointFileManager.shouldFailInCreateAtomic) { + TestCheckpointFileManager.cancelCalledInCreateAtomic = false + } + val originalOut = super.createAtomic(path, overwrite) + + new CancellableFSDataOutputStream(originalOut) { + override def close(): Unit = { + if (TestCheckpointFileManager.shouldFailInCreateAtomic) { + throw new IOException("Copy failed intentionally") + } + super.close() + } + + override def cancel(): Unit = { + TestCheckpointFileManager.cancelCalledInCreateAtomic = true + originalOut.cancel() + } + } + } +} + +object TestCheckpointFileManager { + @volatile var shouldFailInCreateAtomic = false + @volatile var cancelCalledInCreateAtomic = false +} + + +/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ +private class FakeFileSystem extends RawLocalFileSystem { + import FakeFileSystem.scheme + + override def getUri: URI = { + URI.create(s"$scheme:///") + } +} + +private object FakeFileSystem { + val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 4677769c12a35..610f0cab955b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -17,21 +17,19 @@ package org.apache.spark.sql.execution.streaming -import java.io.{File, FileNotFoundException, IOException} +import java.io.File import java.net.URI import java.util.ConcurrentModificationException import scala.language.implicitConversions import scala.util.Random -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.scalatest.concurrent.Waiters._ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.streaming.FakeFileSystem._ -import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.UninterruptibleThread @@ -43,20 +41,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { private implicit def toOption[A](a: A): Option[A] = Option(a) - test("FileManager: FileContextManager") { - withTempDir { temp => - val path = new Path(temp.getAbsolutePath) - testFileManager(path, new FileContextManager(path, new Configuration)) - } - } - - test("FileManager: FileSystemManager") { - withTempDir { temp => - val path = new Path(temp.getAbsolutePath) - testFileManager(path, new FileSystemManager(path, new Configuration)) - } - } - test("HDFSMetadataLog: basic") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir @@ -82,26 +66,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") { - spark.conf.set( - s"fs.$scheme.impl", - classOf[FakeFileSystem].getName) - withTempDir { temp => - val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") - assert(metadataLog.add(0, "batch0")) - assert(metadataLog.getLatest() === Some(0 -> "batch0")) - assert(metadataLog.get(0) === Some("batch0")) - assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) - - - val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") - assert(metadataLog2.get(0) === Some("batch0")) - assert(metadataLog2.getLatest() === Some(0 -> "batch0")) - assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) - - } - } - test("HDFSMetadataLog: purge") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) @@ -206,60 +170,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - /** Basic test case for [[FileManager]] implementation. */ - private def testFileManager(basePath: Path, fm: FileManager): Unit = { - // Mkdirs - val dir = new Path(s"$basePath/dir/subdir/subsubdir") - assert(!fm.exists(dir)) - fm.mkdirs(dir) - assert(fm.exists(dir)) - fm.mkdirs(dir) - - // List - val acceptAllFilter = new PathFilter { - override def accept(path: Path): Boolean = true - } - val rejectAllFilter = new PathFilter { - override def accept(path: Path): Boolean = false - } - assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir")) - assert(fm.list(basePath, rejectAllFilter).length === 0) - - // Create - val path = new Path(s"$dir/file") - assert(!fm.exists(path)) - fm.create(path).close() - assert(fm.exists(path)) - intercept[IOException] { - fm.create(path) - } - - // Open and delete - fm.open(path).close() - fm.delete(path) - assert(!fm.exists(path)) - intercept[IOException] { - fm.open(path) - } - fm.delete(path) // should not throw exception - - // Rename - val path1 = new Path(s"$dir/file1") - val path2 = new Path(s"$dir/file2") - fm.create(path1).close() - assert(fm.exists(path1)) - fm.rename(path1, path2) - intercept[FileNotFoundException] { - fm.rename(path1, path2) - } - val path3 = new Path(s"$dir/file3") - fm.create(path3).close() - assert(fm.exists(path3)) - intercept[FileAlreadyExistsException] { - fm.rename(path2, path3) - } - } - test("verifyBatchIds") { import HDFSMetadataLog.verifyBatchIds verifyBatchIds(Seq(1L, 2L, 3L), Some(1L), Some(3L)) @@ -277,14 +187,3 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L))) } } - -/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ -class FakeFileSystem extends RawLocalFileSystem { - override def getUri: URI = { - URI.create(s"$scheme:///") - } -} - -object FakeFileSystem { - val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}" -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index c843b65020d8c..276cff24511da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{File, IOException} import java.net.URI import java.util.UUID -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -28,17 +27,17 @@ import scala.util.Random import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.fs._ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.count import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -138,7 +137,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(getData(provider, 19) === Set("a" -> 19)) } - test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") { + testQuietly("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") { val conf = new Configuration() conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName) conf.set("fs.defaultFS", "fake:///") @@ -344,7 +343,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } - test("SPARK-18342: commit fails when rename fails") { + testQuietly("SPARK-18342: commit fails when rename fails") { import RenameReturnsFalseFileSystem._ val dir = scheme + "://" + newDir() val conf = new Configuration() @@ -366,7 +365,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] def numTempFiles: Int = { if (deltaFileDir.exists) { - deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && !n.startsWith(".")) + deltaFileDir.listFiles.map(_.getName).count(n => n.endsWith(".tmp")) } else 0 } @@ -471,6 +470,41 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("error writing [version].delta cancels the output stream") { + + val hadoopConf = new Configuration() + hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[TestCheckpointFileManager].getName) + val remoteDir = Utils.createTempDir().getAbsolutePath + + val provider = newStoreProvider( + opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = hadoopConf) + + // Disable failure of output stream and generate versions + TestCheckpointFileManager.shouldFailInCreateAtomic = false + for (version <- 1 to 10) { + val store = provider.getStore(version - 1) + put(store, version.toString, version) // update "1" -> 1, "2" -> 2, ... + store.commit() + } + val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet + + val store = provider.getStore(10) + // Fail commit for next version and verify that reloading resets the files + TestCheckpointFileManager.shouldFailInCreateAtomic = true + put(store, "11", 11) + val e = intercept[IllegalStateException] { quietly { store.commit() } } + assert(e.getCause.isInstanceOf[IOException], "Was waiting the IOException to be thrown") + TestCheckpointFileManager.shouldFailInCreateAtomic = false + + // Abort commit for next version and verify that reloading resets the files + val store2 = provider.getStore(10) + put(store2, "11", 11) + store2.abort() + assert(TestCheckpointFileManager.cancelCalledInCreateAtomic) + } + override def newStoreProvider(): HDFSBackedStateStoreProvider = { newStoreProvider(opId = Random.nextInt(), partition = 0) } @@ -720,6 +754,14 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] * this provider */ def getData(storeProvider: ProviderClass, version: Int): Set[(String, Int)] + + protected def testQuietly(name: String)(f: => Unit): Unit = { + test(name) { + quietly { + f + } + } + } } object StateStoreTestsHelper { From f1fc175a3e599c8b72a2c07dcd97bdc3d43e2092 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 Apr 2018 17:02:05 -0700 Subject: [PATCH 2/9] Reduced the public interface further and avoid file deletes in rename --- .../streaming/CheckpointFileManager.scala | 186 ++++++++---------- .../execution/streaming/HDFSMetadataLog.scala | 2 +- .../state/HDFSBackedStateStoreProvider.scala | 4 +- .../CheckpointFileManagerSuite.scala | 27 +-- 4 files changed, 88 insertions(+), 131 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index e5338dff4c57e..52024110c9d82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -49,6 +50,21 @@ trait CheckpointFileManager { import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + /** + * Create a file and make its contents available atomically after the output stream is closed. + * + * @param path Path to create + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + * overwrite the file if it already exists. It should not throw + * any exception if the file exists. However, if false, then the + * implementation must not overwrite if the file alraedy exists and + * must throw `FileAlreadyExistsException` in that case. + */ + def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + /** List the files in a path that match a filter. */ def list(path: Path, filter: PathFilter): Array[FileStatus] @@ -63,33 +79,37 @@ trait CheckpointFileManager { /** Whether path exists */ def exists(path: Path): Boolean - /** Create a file. */ - def create(path: Path, overwrite: Boolean): FSDataOutputStream - - /** Create a file and make its contents available atomically after the output stream is closed. */ - def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream - - /** Open a file for reading, or throw exception if it does not exist. */ - def open(path: Path): FSDataInputStream - - /** Rename a file. */ - def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit - /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ def delete(path: Path): Unit - /** Copy a local file to a remote file. */ - def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit - - /** Copy a remote file to the local file. */ - def copyToLocalFile(srcPath: Path, localDestFile: File): Unit - /** Is the default file system this implementation is operating on the local file system. */ def isLocal: Boolean } object CheckpointFileManager extends Logging { + /** + * Additional methods in CheckpointFileManager implementations that allows + * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename + */ + sealed trait RenameHelperMethods { self => CheckpointFileManager + /** Create a file with overwrite. */ + def create(path: Path): FSDataOutputStream + + /** + * Rename a file. + * + * @param srcPath Source path to rename + * @param dstPath Destination path to rename to + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + * overwrite the file if it already exists. It should not throw + * any exception if the file exists. However, if false, then the + * implementation must not overwrite if the file alraedy exists and + * must throw `FileAlreadyExistsException` in that case. + */ + def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit + } + /** * An interface to add the cancel() operation to [[FSDataOutputStream]]. This is used * mainly by `CheckpointFileManager.createAtomic` to write a file atomically. @@ -107,13 +127,13 @@ object CheckpointFileManager extends Logging { * to a temporary file and then renames. */ sealed class RenameBasedFSDataOutputStream( - fm: CheckpointFileManager, + fm: CheckpointFileManager with RenameHelperMethods, finalPath: Path, tempPath: Path, - overwrite: Boolean) - extends CancellableFSDataOutputStream(fm.create(tempPath, overwrite)) { + overwriteIfPossible: Boolean) + extends CancellableFSDataOutputStream(fm.create(tempPath)) { - def this(fm: CheckpointFileManager, path: Path, overwrite: Boolean) = { + def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = { this(fm, path, generateTempPath(path), overwrite) } @@ -124,7 +144,7 @@ object CheckpointFileManager extends Logging { try { if (terminated) return super.close() - fm.rename(tempPath, finalPath, overwrite) + fm.rename(tempPath, finalPath, overwriteIfPossible) logInfo(s"Renamed temp file $tempPath to $finalPath") } finally { terminated = true @@ -164,12 +184,12 @@ object CheckpointFileManager extends Logging { } catch { case e: UnsupportedFileSystemException => logWarning( - "Could not use FileContext API for managing metadata log files at path " + - s"$path. Using FileSystem API instead for managing log files. The log may be " + - s"inconsistent under failures.") + "Could not use FileContext API for managing Structured Streaming checkpoint files at " + + s"$path. Using FileSystem API instead for managing log files. If the implementation " + + s"of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of" + + s"your Structured Streaming is not guaranteed.") new FileSystemBasedCheckpointFileManager(path, hadoopConf) } - new FileSystemBasedCheckpointFileManager(path, hadoopConf) } private def generateTempPath(path: Path): Path = { @@ -182,7 +202,7 @@ object CheckpointFileManager extends Logging { /** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileSystem]] API. */ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) - extends CheckpointFileManager with Logging { + extends CheckpointFileManager with RenameHelperMethods with Logging { import CheckpointFileManager._ @@ -199,12 +219,13 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration fs.mkdirs(path, FsPermission.getDirDefault) } - override def create(path: Path, overwrite: Boolean): FSDataOutputStream = { - fs.create(path, overwrite) + override def create(path: Path): FSDataOutputStream = { + fs.create(path, true) } - override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = { - new RenameBasedFSDataOutputStream(this, path, overwrite) + override def createAtomic( + path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) } override def open(path: Path): FSDataInputStream = { @@ -215,47 +236,33 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration fs.exists(path) } - override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = { - if (!overwrite && fs.exists(dstPath)) { + override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { + if (!overwriteIfPossible && fs.exists(dstPath)) { throw new FileAlreadyExistsException( s"Failed to rename $srcPath to $dstPath as destination already exists") } - def deleteAndRename(prevException: Exception): Unit = { - if (overwrite) { - try { - if (fs.delete(dstPath, true)) { - logWarning(s"Failed to delete $dstPath before second attempt to rename") - } - if (!fs.rename(srcPath, dstPath)) { - val msg = s"Failed to rename temp file $srcPath to $dstPath as second attempt to" + - s"rename (after delete) returned false" - logWarning(msg) - val e = new IOException(msg) - e.addSuppressed(prevException) - throw e - } - } catch { - case NonFatal(e) => - logError(s"Failed to write atomically to $dstPath", e) - if (prevException != null) e.addSuppressed(prevException) - throw e - } - } else { - throw prevException - } - } - try { if (!fs.rename(srcPath, dstPath)) { - val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" - logWarning(msg) - deleteAndRename(new IOException(msg)) + if (fs.exists(dstPath) && !overwriteIfPossible) { + // Some implementations of FileSystem may not throw FileAlreadyExistsException but + // only return false if file already exists. Explicitly throw the error. + // Note that this is definitely not atomic, so this is only a best-effort attempt + // to throw the most appropriate exception when rename returned false. + throw new FileAlreadyExistsException(s"$dstPath already exists") + } else { + val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" + logWarning(msg) + throw new IOException(msg) + } } } catch { case fe: FileAlreadyExistsException => + // Some implementation of FileSystem can directly throw FileAlreadyExistsException if file + // already exists. Ignore the error if overwriteIfPossible = true as it is expected to be + // best effort. logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe) - deleteAndRename(fe) + if (!overwriteIfPossible) throw fe } } @@ -269,14 +276,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration } } - override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = { - fs.copyFromLocalFile(new Path(localSrcFile.getAbsoluteFile.toURI), destPath) - } - - override def copyToLocalFile(srcPath: Path, localDestFile: File): Unit = { - fs.copyToLocalFile(srcPath, new Path(localDestFile.getAbsoluteFile.toURI)) - } - override def isLocal: Boolean = fs match { case _: LocalFileSystem | _: RawLocalFileSystem => true case _ => false @@ -286,7 +285,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration /** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) - extends CheckpointFileManager with Logging { + extends CheckpointFileManager with RenameHelperMethods with Logging { import CheckpointFileManager._ @@ -304,14 +303,14 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio fc.mkdir(path, FsPermission.getDirDefault, true) } - override def create(path: Path, overwrite: Boolean): FSDataOutputStream = { + override def create(path: Path): FSDataOutputStream = { import CreateFlag._ - val flags = if (overwrite) EnumSet.of(CREATE, OVERWRITE) else EnumSet.of(CREATE) - fc.create(path, flags) + fc.create(path, EnumSet.of(CREATE, OVERWRITE)) } - override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = { - new RenameBasedFSDataOutputStream(this, path, overwrite) + override def createAtomic( + path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) } override def open(path: Path): FSDataInputStream = { @@ -322,9 +321,9 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio fc.util.exists(path) } - override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = { + override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ - fc.rename(srcPath, dstPath, if (overwrite) OVERWRITE else NONE) + fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) } @@ -337,35 +336,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio } } - override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = { - val localFc = FileContext.getLocalFSFileContext - var in: InputStream = null - var out: OutputStream = null - try { - in = localFc.open(new Path(localSrcFile.getAbsoluteFile.toURI)) - out = fc.create(destPath, EnumSet.of(CreateFlag.CREATE)) - IOUtils.copyLarge(in, out) - } finally { - if (in != null) in.close() - if (out != null) out.close() - } - } - - override def copyToLocalFile(srcPath: Path, localDstFile: File): Unit = { - val localFc = FileContext.getLocalFSFileContext - var in: InputStream = null - var out: OutputStream = null - try { - in = fc.open(srcPath) - out = localFc.create( - new Path(localDstFile.getAbsoluteFile.toURI), EnumSet.of(CreateFlag.CREATE)) - IOUtils.copyLarge(in, out) - } finally { - if (in != null) in.close() - if (out != null) out.close() - } - } - override def isLocal: Boolean = fc.getDefaultFileSystem match { case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index faae8cbbccd5a..4acb221f4c12c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -120,7 +120,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * valid behavior, we still need to prevent it from destroying the files. */ private def writeBatchToFile(metadata: T, path: Path): Unit = { - val output = fileManager.createAtomic(path, overwrite = false) + val output = fileManager.createAtomic(path, overwriteIfPossible = false) try { serialize(metadata, output) output.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 63158fe4f8bfa..9e6d01dbc01a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -92,7 +92,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private val newVersion = version + 1 @volatile private var state: STATE = UPDATING private val finalDeltaFile: Path = deltaFile(newVersion) - private lazy val deltaFileStream = fm.createAtomic(finalDeltaFile, overwrite = true) + private lazy val deltaFileStream = fm.createAtomic(finalDeltaFile, overwriteIfPossible = true) private lazy val compressedStream = compressStream(deltaFileStream) override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId @@ -387,7 +387,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit var rawOutput: CancellableFSDataOutputStream = null var output: DataOutputStream = null try { - rawOutput = fm.createAtomic(targetFile, overwrite = true) + rawOutput = fm.createAtomic(targetFile, overwriteIfPossible = true) output = compressStream(rawOutput) val iter = map.entrySet().iterator() while(iter.hasNext) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index d02006979ffc9..fa5875954a19e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -33,7 +33,7 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { def createManager(path: Path): CheckpointFileManager - test("mkdirs, list, create, rename, delete") { + test("mkdirs, list, createAtomic, open, delete") { withTempPath { p => val basePath = new Path(p.getAbsolutePath) val fm = createManager(basePath) @@ -54,15 +54,17 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir")) assert(fm.list(basePath, rejectAllFilter).length === 0) - // Create + // Create atomic and exists val path = new Path(s"$dir/file") assert(!fm.exists(path)) - fm.create(path, overwrite = false).close() + fm.createAtomic(path, overwriteIfPossible = false).cancel() + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = false).close() assert(fm.exists(path)) intercept[IOException] { - fm.create(path, overwrite = false) + fm.createAtomic(path, overwriteIfPossible = false).close() } - fm.create(path, overwrite = true).close() + fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception // Open and delete fm.open(path).close() @@ -72,21 +74,6 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { fm.open(path) } fm.delete(path) // should not throw exception - - // Rename - val path1 = new Path(s"$dir/file1") - val path2 = new Path(s"$dir/file2") - fm.create(path1, overwrite = true).close() - assert(fm.exists(path1)) - fm.rename(path1, path2, overwrite = false) - - val path3 = new Path(s"$dir/file3") - fm.create(path3, overwrite = true).close() - assert(fm.exists(path3)) - intercept[FileAlreadyExistsException] { - fm.rename(path2, path3, overwrite = false) - } - fm.rename(path2, path3, overwrite = true) } } From f9965f1cecf58b9aa79e0a746e6e16580238ef7b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 Apr 2018 17:16:25 -0700 Subject: [PATCH 3/9] More improvements --- .../execution/streaming/CheckpointFileManager.scala | 4 ++-- .../state/HDFSBackedStateStoreProvider.scala | 6 +++--- .../streaming/CheckpointFileManagerSuite.scala | 13 +++++++++++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 52024110c9d82..331c85365f78e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -243,8 +243,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration } try { - if (!fs.rename(srcPath, dstPath)) { - if (fs.exists(dstPath) && !overwriteIfPossible) { + if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) { + if (fs.exists(dstPath)) { // Some implementations of FileSystem may not throw FileAlreadyExistsException but // only return false if file already exists. Explicitly throw the error. // Note that this is definitely not atomic, so this is only a best-effort attempt diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 9e6d01dbc01a1..7679d243c0922 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -148,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit // the other used for read+write. We don't want the read-only to delete state files. if (state == UPDATING) { state = ABORTED - cancelIfPossible(compressedStream, deltaFileStream) + cancelDeltaFile(compressedStream, deltaFileStream) } else { state = ABORTED } @@ -403,7 +403,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit output.close() } catch { case e: Throwable => - cancelIfPossible(compressedStream = output, rawStream = rawOutput) + cancelDeltaFile(compressedStream = output, rawStream = rawOutput) throw e } logInfo(s"Written snapshot file for version $version of $this at $targetFile") @@ -415,7 +415,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit * @param compressedStream the compressed stream. * @param rawStream the underlying stream which needs to be cancelled. */ - private def cancelIfPossible( + private def cancelDeltaFile( compressedStream: DataOutputStream, rawStream: CancellableFSDataOutputStream): Unit = { try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index fa5875954a19e..185b96c83e285 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -54,16 +54,25 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir")) assert(fm.list(basePath, rejectAllFilter).length === 0) - // Create atomic and exists - val path = new Path(s"$dir/file") + // Create atomic without overwrite + var path = new Path(s"$dir/file") assert(!fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = false).cancel() assert(!fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = false).close() assert(fm.exists(path)) intercept[IOException] { + // should throw exception since file exists and overwrite is false fm.createAtomic(path, overwriteIfPossible = false).close() } + + // Create atomic with overwrite if possible + path = new Path(s"$dir/file2") + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = true).cancel() + assert(!fm.exists(path)) + fm.createAtomic(path, overwriteIfPossible = true).close() + assert(fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception // Open and delete From 1b23492017b8209b6198e6886e06a5cd9d59b9db Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 Apr 2018 19:09:52 -0700 Subject: [PATCH 4/9] Fixed more tests --- .../streaming/CheckpointFileManager.scala | 64 ++++++++++--------- .../CheckpointFileManagerSuite.scala | 39 +++++------ .../streaming/HDFSMetadataLogSuite.scala | 5 +- 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 331c85365f78e..5d3faed525770 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -94,7 +94,7 @@ object CheckpointFileManager extends Logging { */ sealed trait RenameHelperMethods { self => CheckpointFileManager /** Create a file with overwrite. */ - def create(path: Path): FSDataOutputStream + def createTempFile(path: Path): FSDataOutputStream /** * Rename a file. @@ -107,7 +107,7 @@ object CheckpointFileManager extends Logging { * implementation must not overwrite if the file alraedy exists and * must throw `FileAlreadyExistsException` in that case. */ - def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit + def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit } /** @@ -131,7 +131,7 @@ object CheckpointFileManager extends Logging { finalPath: Path, tempPath: Path, overwriteIfPossible: Boolean) - extends CancellableFSDataOutputStream(fm.create(tempPath)) { + extends CancellableFSDataOutputStream(fm.createTempFile(tempPath)) { def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = { this(fm, path, generateTempPath(path), overwrite) @@ -143,8 +143,15 @@ object CheckpointFileManager extends Logging { override def close(): Unit = synchronized { try { if (terminated) return - super.close() - fm.rename(tempPath, finalPath, overwriteIfPossible) + underlyingStream.close() + try { + fm.renameTempFile(tempPath, finalPath, overwriteIfPossible) + } catch { + case fe: FileAlreadyExistsException => + logWarning( + s"Failed to rename temp file $tempPath to $finalPath because file exists", fe) + if (!overwriteIfPossible) throw fe + } logInfo(s"Renamed temp file $tempPath to $finalPath") } finally { terminated = true @@ -208,9 +215,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration protected val fs = path.getFileSystem(hadoopConf) - fs.setVerifyChecksum(false) - fs.setWriteChecksum(false) - override def list(path: Path, filter: PathFilter): Array[FileStatus] = { fs.listStatus(path, filter) } @@ -219,7 +223,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration fs.mkdirs(path, FsPermission.getDirDefault) } - override def create(path: Path): FSDataOutputStream = { + override def createTempFile(path: Path): FSDataOutputStream = { fs.create(path, true) } @@ -236,33 +240,29 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration fs.exists(path) } - override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { + override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { if (!overwriteIfPossible && fs.exists(dstPath)) { throw new FileAlreadyExistsException( s"Failed to rename $srcPath to $dstPath as destination already exists") } - try { - if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) { - if (fs.exists(dstPath)) { - // Some implementations of FileSystem may not throw FileAlreadyExistsException but - // only return false if file already exists. Explicitly throw the error. - // Note that this is definitely not atomic, so this is only a best-effort attempt - // to throw the most appropriate exception when rename returned false. + if (!fs.rename(srcPath, dstPath)) { + // If overwriteIfPossible = false, then we want to find out why the rename failed and + // try to throw the right error. + if (fs.exists(dstPath)) { + // Some implementations of FileSystem may only return false instead of throwing + // FileAlreadyExistsException. In that case, explicitly throw the error the error + // if overwriteIfPossible = false. Note that this is definitely not atomic. + // This is only a best-effort attempt to identify the situation when rename returned + // false. + if (!overwriteIfPossible) { throw new FileAlreadyExistsException(s"$dstPath already exists") - } else { - val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" - logWarning(msg) - throw new IOException(msg) } + } else { + val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" + logWarning(msg) + throw new IOException(msg) } - } catch { - case fe: FileAlreadyExistsException => - // Some implementation of FileSystem can directly throw FileAlreadyExistsException if file - // already exists. Ignore the error if overwriteIfPossible = true as it is expected to be - // best effort. - logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe) - if (!overwriteIfPossible) throw fe } } @@ -303,9 +303,11 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio fc.mkdir(path, FsPermission.getDirDefault, true) } - override def create(path: Path): FSDataOutputStream = { + override def createTempFile(path: Path): FSDataOutputStream = { import CreateFlag._ - fc.create(path, EnumSet.of(CREATE, OVERWRITE)) + import Options._ + fc.create( + path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled())) } override def createAtomic( @@ -321,7 +323,7 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio fc.util.exists(path) } - override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { + override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index 185b96c83e285..83f42f9a59ff3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils @@ -61,9 +62,11 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { assert(!fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = false).close() assert(fm.exists(path)) - intercept[IOException] { - // should throw exception since file exists and overwrite is false - fm.createAtomic(path, overwriteIfPossible = false).close() + quietly { + intercept[IOException] { + // should throw exception since file exists and overwrite is false + fm.createAtomic(path, overwriteIfPossible = false).close() + } } // Create atomic with overwrite if possible @@ -107,22 +110,22 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") { import FakeFileSystem.scheme - spark.conf.set( - s"fs.$scheme.impl", - classOf[FakeFileSystem].getName) - withTempDir { temp => - val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") - assert(metadataLog.add(0, "batch0")) - assert(metadataLog.getLatest() === Some(0 -> "batch0")) - assert(metadataLog.get(0) === Some("batch0")) - assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) - - - val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") - assert(metadataLog2.get(0) === Some("batch0")) - assert(metadataLog2.getLatest() === Some(0 -> "batch0")) - assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) + spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName) + quietly { + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.getLatest() === Some(0 -> "batch0")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) + + val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") + assert(metadataLog2.get(0) === Some("batch0")) + assert(metadataLog2.getLatest() === Some(0 -> "batch0")) + assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) + + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 610f0cab955b0..157227f2c4545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -85,7 +85,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { // There should be exactly one file, called "2", in the metadata directory. // This check also tests for regressions of SPARK-17475 - val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + val allFiles = new File(metadataLog.metadataPath.toString).listFiles() + .filter(!_.getName.startsWith(".")).toSeq assert(allFiles.size == 1) assert(allFiles(0).getName() == "2") } @@ -136,7 +137,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("HDFSMetadataLog: metadata directory collision") { + testQuietly("HDFSMetadataLog: metadata directory collision") { withTempDir { temp => val waiter = new Waiter val maxBatchId = 100 From 35c66367ef5c1a88689ddcaad52ff36f4ee91494 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 12 Apr 2018 10:19:09 -0700 Subject: [PATCH 5/9] Addressed comments --- .../streaming/CheckpointFileManager.scala | 9 +++++---- .../streaming/CheckpointFileManagerSuite.scala | 17 ++++++++++------- .../CompactibleFileStreamLogSuite.scala | 4 ---- .../streaming/HDFSMetadataLogSuite.scala | 8 -------- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 5d3faed525770..4d69a21381151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -16,12 +16,11 @@ */ package org.apache.spark.sql.execution.streaming -import java.io.{FileSystem => _, _} +import java.io.{FileNotFoundException, IOException, OutputStream} import java.util.{EnumSet, UUID} import scala.util.control.NonFatal -import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} @@ -228,7 +227,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration } override def createAtomic( - path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) } @@ -311,7 +311,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio } override def createAtomic( - path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index 83f42f9a59ff3..b62de25bb3f6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -109,8 +109,8 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { } test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") { - import FakeFileSystem.scheme - spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName) + import CheckpointFileManagerSuiteFileSystem.scheme + spark.conf.set(s"fs.$scheme.impl", classOf[CheckpointFileManagerSuiteFileSystem].getName) quietly { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") @@ -177,15 +177,18 @@ object TestCheckpointFileManager { } -/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ -private class FakeFileSystem extends RawLocalFileSystem { - import FakeFileSystem.scheme +/** + * CheckpointFileManagerSuiteFileSystem to test fallback of the CheckpointFileManager + * from FileContext to FileSystem API. + */ +private class CheckpointFileManagerSuiteFileSystem extends RawLocalFileSystem { + import CheckpointFileManagerSuiteFileSystem.scheme override def getUri: URI = { URI.create(s"$scheme:///") } } -private object FakeFileSystem { - val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}" +private object CheckpointFileManagerSuiteFileSystem { + val scheme = s"CheckpointFileManagerSuiteFileSystem${math.abs(Random.nextInt)}" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 12eaf63415081..ecf83bce83a97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -27,10 +27,6 @@ import org.apache.spark.sql.test.SharedSQLContext class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext { - /** To avoid caching of FS objects */ - override protected def sparkConf = - super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true") - import CompactibleFileStreamLog._ /** -- testing of `object CompactibleFileStreamLog` begins -- */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 157227f2c4545..9268306ce4275 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -18,27 +18,19 @@ package org.apache.spark.sql.execution.streaming import java.io.File -import java.net.URI import java.util.ConcurrentModificationException import scala.language.implicitConversions -import scala.util.Random -import org.apache.hadoop.fs._ import org.scalatest.concurrent.Waiters._ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.FakeFileSystem._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.UninterruptibleThread class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { - /** To avoid caching of FS objects */ - override protected def sparkConf = - super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true") - private implicit def toOption[A](a: A): Option[A] = Option(a) test("HDFSMetadataLog: basic") { From 1818cb2e173b87b145be8746a1e9a3a48ce038c1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 12 Apr 2018 10:21:16 -0700 Subject: [PATCH 6/9] Removed empty lines --- .../sql/execution/streaming/CheckpointFileManagerSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index b62de25bb3f6c..834a7e8606693 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -119,12 +119,10 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { assert(metadataLog.get(0) === Some("batch0")) assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0")) - val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}") assert(metadataLog2.get(0) === Some("batch0")) assert(metadataLog2.getLatest() === Some(0 -> "batch0")) assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0")) - } } } From 12177b19ef924a6f4fee2d3f774beefae19cc0f0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 12 Apr 2018 10:42:34 -0700 Subject: [PATCH 7/9] Addressed one more comment --- .../streaming/CheckpointFileManagerSuite.scala | 16 ++++++++-------- .../CompactibleFileStreamLogSuite.scala | 1 - .../streaming/state/StateStoreSuite.scala | 14 ++++++++------ 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index 834a7e8606693..8e75f4582fd55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -101,10 +101,10 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession { test("CheckpointFileManager.create() should pick up user-specified class from conf") { withSQLConf( SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> - classOf[TestCheckpointFileManager].getName) { + classOf[CreateAtomicTestManager].getName) { val fileManager = CheckpointFileManager.create(new Path("/"), spark.sessionState.newHadoopConf) - assert(fileManager.isInstanceOf[TestCheckpointFileManager]) + assert(fileManager.isInstanceOf[CreateAtomicTestManager]) } } @@ -142,34 +142,34 @@ class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTes /** A fake implementation to test different characteristics of CheckpointFileManager interface */ -class TestCheckpointFileManager(path: Path, hadoopConf: Configuration) +class CreateAtomicTestManager(path: Path, hadoopConf: Configuration) extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { import CheckpointFileManager._ override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = { - if (TestCheckpointFileManager.shouldFailInCreateAtomic) { - TestCheckpointFileManager.cancelCalledInCreateAtomic = false + if (CreateAtomicTestManager.shouldFailInCreateAtomic) { + CreateAtomicTestManager.cancelCalledInCreateAtomic = false } val originalOut = super.createAtomic(path, overwrite) new CancellableFSDataOutputStream(originalOut) { override def close(): Unit = { - if (TestCheckpointFileManager.shouldFailInCreateAtomic) { + if (CreateAtomicTestManager.shouldFailInCreateAtomic) { throw new IOException("Copy failed intentionally") } super.close() } override def cancel(): Unit = { - TestCheckpointFileManager.cancelCalledInCreateAtomic = true + CreateAtomicTestManager.cancelCalledInCreateAtomic = true originalOut.cancel() } } } } -object TestCheckpointFileManager { +object CreateAtomicTestManager { @volatile var shouldFailInCreateAtomic = false @volatile var cancelCalledInCreateAtomic = false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index ecf83bce83a97..ec961a9ecb592 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.streaming.FakeFileSystem._ import org.apache.spark.sql.test.SharedSQLContext class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 276cff24511da..73f8705060402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -475,14 +475,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val hadoopConf = new Configuration() hadoopConf.set( SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, - classOf[TestCheckpointFileManager].getName) + classOf[CreateAtomicTestManager].getName) val remoteDir = Utils.createTempDir().getAbsolutePath val provider = newStoreProvider( opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = hadoopConf) // Disable failure of output stream and generate versions - TestCheckpointFileManager.shouldFailInCreateAtomic = false + CreateAtomicTestManager.shouldFailInCreateAtomic = false for (version <- 1 to 10) { val store = provider.getStore(version - 1) put(store, version.toString, version) // update "1" -> 1, "2" -> 2, ... @@ -490,19 +490,21 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet + CreateAtomicTestManager.cancelCalledInCreateAtomic = false val store = provider.getStore(10) // Fail commit for next version and verify that reloading resets the files - TestCheckpointFileManager.shouldFailInCreateAtomic = true + CreateAtomicTestManager.shouldFailInCreateAtomic = true put(store, "11", 11) val e = intercept[IllegalStateException] { quietly { store.commit() } } - assert(e.getCause.isInstanceOf[IOException], "Was waiting the IOException to be thrown") - TestCheckpointFileManager.shouldFailInCreateAtomic = false + assert(e.getCause.isInstanceOf[IOException]) + CreateAtomicTestManager.shouldFailInCreateAtomic = false // Abort commit for next version and verify that reloading resets the files + CreateAtomicTestManager.cancelCalledInCreateAtomic = false val store2 = provider.getStore(10) put(store2, "11", 11) store2.abort() - assert(TestCheckpointFileManager.cancelCalledInCreateAtomic) + assert(CreateAtomicTestManager.cancelCalledInCreateAtomic) } override def newStoreProvider(): HDFSBackedStateStoreProvider = { From ef05009e491d1ffdca2a37ba0441ea8507756e3d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 12 Apr 2018 17:25:47 -0700 Subject: [PATCH 8/9] Fixed synchronization bug --- .../streaming/state/HDFSBackedStateStoreProvider.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 7679d243c0922..df722b953228b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -129,8 +129,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit verify(state == UPDATING, "Cannot commit after already committed or aborted") try { - finalizeDeltaFile(compressedStream) - loadedMaps.put(newVersion, mapToUpdate) + commitUpdates(newVersion, mapToUpdate, compressedStream) state = COMMITTED logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") newVersion @@ -250,6 +249,13 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean) + private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = { + synchronized { + finalizeDeltaFile(output) + loadedMaps.put(newVersion, map) + } + } + /** * Get iterator of all the data of the latest version of the store. * Note that this will look up the files to determined the latest known version. From c5b0c98257e39d6af2dd8f702b8cbc9f9e6fabe9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 13 Apr 2018 12:22:28 -0700 Subject: [PATCH 9/9] Addressed comments --- .../streaming/CheckpointFileManager.scala | 20 ++++++++++--------- .../execution/streaming/HDFSMetadataLog.scala | 16 --------------- .../CheckpointFileManagerSuite.scala | 2 +- 3 files changed, 12 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 4d69a21381151..606ba250ad9d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -237,7 +237,12 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration } override def exists(path: Path): Boolean = { - fs.exists(path) + try + return fs.getFileStatus(path) != null + catch { + case e: FileNotFoundException => + return false + } } override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { @@ -247,17 +252,14 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration } if (!fs.rename(srcPath, dstPath)) { - // If overwriteIfPossible = false, then we want to find out why the rename failed and - // try to throw the right error. + // FileSystem.rename() returning false is very ambiguous as it can be for many reasons. + // This tries to make a best effort attempt to return the most appropriate exception. if (fs.exists(dstPath)) { - // Some implementations of FileSystem may only return false instead of throwing - // FileAlreadyExistsException. In that case, explicitly throw the error the error - // if overwriteIfPossible = false. Note that this is definitely not atomic. - // This is only a best-effort attempt to identify the situation when rename returned - // false. if (!overwriteIfPossible) { - throw new FileAlreadyExistsException(s"$dstPath already exists") + throw new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists") } + } else if (!fs.exists(srcPath)) { + throw new FileNotFoundException(s"Failed to rename as $srcPath was not found") } else { val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false" logWarning(msg) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 4acb221f4c12c..bd0a46115ceb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -137,22 +137,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - /** - * @return the deserialized metadata in a batch file, or None if file not exist. - * @throws IllegalArgumentException when path does not point to a batch file. - */ - def get(batchFile: Path): Option[T] = { - if (fileManager.exists(batchFile)) { - if (isBatchFile(batchFile)) { - get(pathToBatchId(batchFile)) - } else { - throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!") - } - } else { - None - } - } - override def get(batchId: Long): Option[T] = { val batchMetadataFile = batchIdToPath(batchId) if (fileManager.exists(batchMetadataFile)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index 8e75f4582fd55..fe59cb25d5005 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -34,7 +34,7 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { def createManager(path: Path): CheckpointFileManager - test("mkdirs, list, createAtomic, open, delete") { + test("mkdirs, list, createAtomic, open, delete, exists") { withTempPath { p => val basePath = new Path(p.getAbsolutePath) val fm = createManager(basePath)