Skip to content

Conversation

@jerryshao
Copy link
Contributor

What changes were proposed in this pull request?

Current metadataLog in FileStreamSource will add a checkpoint file in each batch but do not have the ability to remove/compact, which will lead to large number of small files when running for a long time. So here propose to compact the old logs into one file. This method is quite similar to FileStreamSinkLog but simpler.

How was this patch tested?

Unit test added.

@SparkQA
Copy link

SparkQA commented Jun 5, 2016

Test build #60000 has finished for PR 13513 at commit 2ed1115.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileStreamSourceLog(sparkSession: SparkSession, path: String)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move (was $compactInterval) at the end of the message.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60060 has finished for PR 13513 at commit 2ed1115.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileStreamSourceLog(sparkSession: SparkSession, path: String)

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60071 has finished for PR 13513 at commit 798c450.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

@tdas @zsxwing , what is your comment about this PR? Thanks a lot.

@zsxwing
Copy link
Member

zsxwing commented Sep 8, 2016

@jerryshao the approach seems good to me. Could you refactor the codes to avoid copying codes from FileStreamSinkLog? It's hard to maintain duplicated codes.

@jerryshao
Copy link
Contributor Author

Sure, I will change the code.

@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65244 has finished for PR 13513 at commit c2aad87.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class FileEntry(path: String, timestamp: Timestamp, action: String = ADD_ACTION)
    • class FileStreamSourceLog(sparkSession: SparkSession, path: String)

@jerryshao
Copy link
Contributor Author

@zsxwing , thanks a lot for your comments, I did several refactorings:

  1. Abstract and consolidate FileStreamSinkLog and FileStreamSourceLog, now they share same code path to do compaction.
  2. Change FileStreamSourceLog to use json format instead of binary coding, to add the compatibility and flexibility for future extension.
  3. Improve the logics to fetch all metadata logs, now if compact log is existed, only scan compact log.

Please help to review again, thanks a lot.

@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65245 has finished for PR 13513 at commit f179349.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class CompactibleFileStreamLog[T: ClassTag](

@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65246 has finished for PR 13513 at commit 31340b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Sep 12, 2016

Just noticed that FileStreamSource.getBatch(start: Option[Offset], end: Offset) is broken in this PR. start could be an arbitrary offset.

I think we need to store batchId with its file paths together in the metadata log. FileStreamSource.getBatch(start: Option[Offset], end: Offset) could be very slow when all batches are in the same file because we need to parse the whole file to get the mapping from batchId to files. However, in most cases, FileStreamSource.getBatch only queries the latest batch, so if we don't compact the latest metadata file, we can make it pretty fast by reading one small file for most of cases. When recovering from failure, the performance of FileStreamSource.getBatch doesn't really matter.

@frreiss
Copy link
Contributor

frreiss commented Sep 12, 2016

You could just move the metadata deletion logic from FileStreamSinkLog into CompactibleFileStreamLog. Then FileStreamSource could issue DELETE log records for files that are older than FileStreamSource.lastPurgeTimestamp.

@jerryshao
Copy link
Contributor Author

@zsxwing @frreiss thanks a lot for your comments.

I think the semantics of FileStreamSource.getBatch(start: Option[Offset], end: Offset) still keeps the same, since I overrided the get method in FileStreamSourceLog and filter out some compacted data.

Yes it could be slow to get a batch where it happens to be a compact batch. I think we could have 2 solutions:

  1. doing compact on the next of latest metadata file (as what I did before), then this will help most of the scenarios in FileStreamSource.
  2. We could put the data in this patch at beginning when doing compaction, so we don't need to scan the whole file to get this batch's metadata.

Both two solutions need extra works, what do you think?

@frreiss
Copy link
Contributor

frreiss commented Sep 13, 2016

Ah, now I fully understand @zsxwing's earlier comment about the semantics of the semantics of Source.getBatch(). Those semantics have a design flaw; see the email thread I started at http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-tt18551.html. Basically, it's impossible to implement a Source to the written API spec without keeping unbounded state. I have an open PR to fix this problem at #14553.

In the short run, I think that @jerryshao's changes here are ok with respect to Source.getBatch. The approach in this PR will work as long as the internal structure of the StreamExecution class doesn't change and as long as Spark does not have to recover from an outage longer than the compaction interval. The recent changes to FileInputStream under SPARK-17165 (#14728) have the same problem, and those changes are already committed.

@zsxwing
Copy link
Member

zsxwing commented Sep 13, 2016

Sorry. Replied a wrong PR. Deleting.

@zsxwing
Copy link
Member

zsxwing commented Sep 13, 2016

@frreiss SPARK-17165 (#14728) uses SeenFilesMap.lastPurgeTimestamp to ignore files. when recovering from failure, SeenFilesMap.lastPurgeTimestamp will be set via the files in the metadata log. File paths not stored in the memory but older than SeenFilesMap.lastPurgeTimestamp won't be processed. Therefore, it doesn't need to store unbounded state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make VERSION be a constructor parameter in order to support to change source or sink format separately?

@zsxwing
Copy link
Member

zsxwing commented Sep 13, 2016

@jerryshao here is a test case to show the issue about getBatch:

  test("getBatch") {
    withTempDirs { case (src, tmp) =>
      withSQLConf(
        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
        // Force deleting the old logs
        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
      ) {
        val fileStream = createFileStream("text", src.getCanonicalPath)
        val filtered = fileStream.filter($"value" contains "keep")

        testStream(filtered)(
          AddTextFileData("keep1", src, tmp),
          CheckAnswer("keep1"),
          AddTextFileData("keep2", src, tmp),
          CheckAnswer("keep1", "keep2"),
          AddTextFileData("keep3", src, tmp),
          CheckAnswer("keep1", "keep2", "keep3"),
          AssertOnQuery("check getBatch") { execution: StreamExecution =>
            val _sources = PrivateMethod[Seq[Source]]('sources)
            val fileSource =
              (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
            assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() ===
              List("keep1", "keep2", "keep3"))
            assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() ===
              List("keep2", "keep3"))
            assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() ===
              List("keep3"))
          }
        )
      }
    }
  }

@jerryshao
Copy link
Contributor Author

jerryshao commented Sep 14, 2016

Thanks a lot @zsxwing and @frreiss for your comments.

For the slow scan problem of compact batch. Originally I planned to to not merge the latest batch as I did before, also as suggested above. but with several different tries it is hard to implement with small changes. So for now I still choose the same implementation with a simple cache layer to overcome this problem, the basic compaction algorithm is still the same as FileStreamSinkLog. I think it is easier to maintain.

For the problem of semantics broken. I realized that it is really a problem, but current code didn't touch it. So I changed to scan the compacted batch files to retrieve missing batches. It is a little time-consuming, and the current logic of FileStreamSource will not touch this part.

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65365 has finished for PR 13513 at commit f9a4bcb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileStreamSinkLog(
    • case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET)
    • class FileStreamSourceLog(

@SparkQA
Copy link

SparkQA commented Sep 14, 2016

Test build #65368 has finished for PR 13513 at commit cb4194e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache idea looks good to me. For the current PR, I suggest that using a new log class FileSourceLogEntry for file source as the following benefits:

  • Avoid storing unnecessary info for file sink log.
  • Avoid changing the format for file sink log.
  • The code will be a bit cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop doesn't change the original map.

scala> val m = scala.collection.mutable.LinkedHashMap[Int, Int]()
m: scala.collection.mutable.LinkedHashMap[Int,Int] = Map()

scala> 

scala> m(2) = 1

scala> m
res1: scala.collection.mutable.LinkedHashMap[Int,Int] = Map(2 -> 1)

scala> m.drop(1)
res2: scala.collection.mutable.LinkedHashMap[Int,Int] = Map()

scala> m
res3: scala.collection.mutable.LinkedHashMap[Int,Int] = Map(2 -> 1)

I think it should be Java LinkedHashMap. This is an example:

private[ui] val batchTimeToOutputOpIdSparkJobIdPair =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sorry for this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we use the following class for FileStreamSource?

case class FileSourceLogEntry(batchId: Long, Seq[FileEntry])

I think this will make the codes here simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here in the parent class CompactibleFileStreamLog we assume that metadata type should be Array[T], which is suitable for both file source and sink log currently. If we change to use FileSourceLogEntry, the base class should be T rather then Array[T], which will make the two inherited class divergent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your clarifying.

@SparkQA
Copy link

SparkQA commented Sep 18, 2016

Test build #65547 has finished for PR 13513 at commit be1abfa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong. If super.add(batchId, logs) is false, then we should always return false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right, I will fix it.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Just one minor issue.

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65628 has finished for PR 13513 at commit bddbc7f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65631 has finished for PR 13513 at commit 84d3d27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Sep 20, 2016

LGTM. Thanks! Merging to master and 2.0.

* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
protected override val fileCleanupDelayMs =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed some conflicts here. Could you submit a follow up PR to use the previous sparkSession.sessionState.conf.fileSinkLogCleanupDelay? Same as the other confs. This only exists in master branch, so we don't need to fix branch 2.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry about it, will fix it now.

asfgit pushed a commit that referenced this pull request Sep 20, 2016
…ataLog in FileStreamSource (branch-2.0)

## What changes were proposed in this pull request?

Backport #13513 to branch 2.0.

## How was this patch tested?

Jenkins

Author: jerryshao <[email protected]>

Closes #15163 from zsxwing/SPARK-15698-spark-2.0.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants