Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Apr 11, 2018

What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly.

  1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of FileSystem or FileContext APIs. It preferably loads FileContext implementation as FileContext of HDFS has atomic renames.
  2. HDFSBackedStateStore (aka in-memory state store)
  • Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation.
  • Writing a snapshot file - Same as above.

Current problems:

  1. State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename.
  2. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized.

Solution:

  1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically.
  2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism.

This PR does that by introducing the interface CheckpointFileManager and modifying HDFSMetadataLog and HDFSBackedStateStore to use the interface. Similar to earlier FileManager, there are implementations based on FileSystem and FileContext APIs, and the latter implementation is preferred to make it work correctly with HDFS.

The key method this interface has is createAtomic(path, overwrite) which returns a CancellableFSDataOutputStream that has the method cancel(). All users of this method need to either call close() to successfully write the file, or cancel() in case of an error.

How was this patch tested?

New tests in CheckpointFileManagerSuite and slightly modified existing tests.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left a couple nits. This is awesome!

*/
package org.apache.spark.sql.execution.streaming

import java.io.{FileSystem => _, _}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoa what does FileSystem => _ do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoa .. i dont know either... my intellij did some weird magic :/

this(fm, path, generateTempPath(path), overwrite)
}

logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: logDebug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of having it as logInfo, so that we can debug stuff if the renaming goes wrong in some way.

}

override def createAtomic(
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separate lines please

}

override def createAtomic(
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on two lines

}

try {
if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should ensure that these are file paths and not directories, in case someone else tries to use these APIs elsewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, they should not. This is specifically designed for one purpose.
Also, generally in all implementations, rename fails if you are doing something non-sensical like rename a file to a path where a directory already exists.

Essentially, I want to keep this code absolutely minimal such that its easy to reason about and has minimal latency. Each check of whether its file or not will add to the latency. Most implementations will do those checks anyway. That's why there were patches in the past that removed unnecessary checks (e.g. e24f21b).

}

private object FakeFileSystem {
val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plagiarizer! :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hahaha.

val store2 = provider.getStore(10)
put(store2, "11", 11)
store2.abort()
assert(TestCheckpointFileManager.cancelCalledInCreateAtomic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you verify that it was false before the abort?

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89223 has finished for PR 21048 at commit df7b339.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"The class used to write checkpoint files atomically. This class must be a subclass \" +
  • trait CheckpointFileManager
  • abstract class CancellableFSDataOutputStream(protected val underlyingStream: OutputStream)
  • sealed class RenameBasedFSDataOutputStream(
  • class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
  • class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89226 has finished for PR 21048 at commit f1fc175.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait RenameHelperMethods

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89227 has finished for PR 21048 at commit f9965f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89232 has finished for PR 21048 at commit 1b23492.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89282 has finished for PR 21048 at commit 1818cb2.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89283 has finished for PR 21048 at commit 75b9b18.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89287 has finished for PR 21048 at commit 12177b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CreateAtomicTestManager(path: Path, hadoopConf: Configuration)


if (!fs.rename(srcPath, dstPath)) {
// If overwriteIfPossible = false, then we want to find out why the rename failed and
// try to throw the right error.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix this comment.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89310 has finished for PR 21048 at commit ef05009.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

s"Failed to rename $srcPath to $dstPath as destination already exists")
}

if (!fs.rename(srcPath, dstPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a lot of ambiguity about "what does it mean if FileSystem.rename() returns false. FileContext.rename() is stricter here, One troublespot in particular is "what if the source file is not there?". If normally gets downgraded to a "return false".

Proposed: add a fs.getFileStatus(srcPath) before the rename call, as it will raise an FNFE if the source file isn't visible. Low cost against HDFS; more against remote object stores, but S3 isn't going to use this one, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I would like the default implementation with rename to work correctly for all FileSystems, including S3AFileSystem (it may not be efficient, but it should be correct).

Also it important to note that this CheckpointFileManager is built not as a general-purpose common API across FileSystem and FileContext, but specifically for the purpose of checkpointing. So in this case, this absolutely not expected that the source path (i.e. the temp file just written by RenameBasedFSDataOutputStream) is not present. So I dont think its worth adding another RPC in the common path just to handle that unexpected case. What I can do is add another check below after the rename return false to try to throw FileNotFoundException instead of the fallback IOException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

As you are doing a one-file rename, the rename() will have that atomic-operation semantics you want everywhere, it's just O(data) on s3 and swift. The direct write on both of those is what's critical to avoid checksum writes to block while that rename takes place, but not for the checksum output commit to take.

fs.delete(path, true)
} catch {
case e: FileNotFoundException =>
logInfo(s"Failed to delete $path as it does not exist")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should never be raised by a filesystem. Have you actually seen this?

Copy link
Contributor Author

@tdas tdas Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. This is just precautionary, as I really havent seen all the code of all FileSystem implementations. Doesn't add any overhead by adding this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basic FS compliance tests, which even GCS now runs, requires that delete just returns false if there's no path. I wouldn't personally worry about it, as if it did ever arise, something has seriously broken.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, some people have to deal with older versions of FileSystems, not just the latest "good" ones. :)
As I said, since there isnt any overhead, I am inclined to keep it in there.

}

override def exists(path: Path): Boolean = {
fs.exists(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tagged as deprecated in Hadoop 3 as too much code was doing some exists() check alongside some other reinvocation fo the same underlying code, e.g if (fs.exists(path) && fs.isFile(path)) { .,.. }. If you call fs.getFileStatus() direct you get an FNFE raised if it isn't and a FileStatus if is is there, which can often be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then basically I have to just copy the FileSystem.exists() code right here :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, tough call. Having looked through much of the spark/hive/ etc code, they to often do that exists() before delete, getFilestatus, ... which is why tag it as deprecated: to make clear that often the workflow is inefficient on non-HDFS, non-Posix stores (Where it's fine)

assert(!fm.exists(dir))
fm.mkdirs(dir)
assert(fm.exists(dir))
fm.mkdirs(dir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if that fm.exists() call was replaced with an fm.getFileStatus() operation, as suggested earlier, this assert could be come assert(fm.getFileStatus(dir).isDirectory)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test. I think it is fine.

@zsxwing
Copy link
Member

zsxwing commented Apr 13, 2018

LGTM

@tdas
Copy link
Contributor Author

tdas commented Apr 13, 2018

@steveloughran @brkyvz @zsxwing Thanks for your comments :)

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89357 has finished for PR 21048 at commit c5b0c98.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Apr 13, 2018

I am merging this to master. Once again, thank you for your reviews.

@asfgit asfgit closed this in cbb41a0 Apr 13, 2018
.doc("The class used to write checkpoint files atomically. This class must be a subclass " +
"of the interface CheckpointFileManager.")
.internal()
.stringConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createOptional? cc @zsxwing @tdas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR can you help to fix this minor issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not make any difference - we read the value from Hadoop Configuration instance (not sure why). Changing this to read from SQLConf is no longer minor one as CheckpointFileManager doesn't get SQLConf instance.

@steveloughran
Copy link
Contributor

if this is being looked at again, it'd be nice to have a reference back end which just did the write straight to the destination: this is exactly what all the public cloud stores (s3, azure-*, gcs) offer as their atomic write. There'd be no need to make things clever and automatically switch; just make it something people can ask for.

fc.create(path, EnumSet.of(CREATE, OVERWRITE))
import Options._
fc.create(
path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas Is there any specific reason why we disabled the Checksum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is was a long time ago, so i dont really remember. but my guess is that the presence of absence of checkpoint can produce unexpected errors.... hence we were avoiding that confusion,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting back on this. We hit a nasty bug in hadoop with files with Null checksum which wouldn't allow the NodeManager to start and wanted to disable this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants