Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.util.Utils

/**
* User specified options for file streams.
*/
class FileStreamOptions(parameters: Map[String, String]) extends Logging {

val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}

/**
* Maximum age of a file that can be found in this directory, before it is deleted.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
* current system time is 2000, and max age is 200, the system will purge files older than
* 800 (rather than 1800) from the internal state.
*
* Default to a week.
*/
val maxFileAgeMs: Long =
Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))

/** Options as specified by the user, in a case-insensitive map, without "path" set. */
val optionMapWithoutPath: Map[String, String] =
new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.spark.sql.execution.streaming

import scala.util.Try
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.OpenHashSet

/**
* A very simple source that reads text files from the given directory as they appear.
* A very simple source that reads files from the given directory as they appear.
*
* TODO Clean up the metadata files periodically
* TODO: Clean up the metadata log files periodically.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO still applies right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it back. Also updated the file with a new test case to make the seen map more robust.

class FileStreamSource(
sparkSession: SparkSession,
Expand All @@ -41,36 +40,62 @@ class FileStreamSource(
metadataPath: String,
options: Map[String, String]) extends Source with Logging {

private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
import FileStreamSource._

private val sourceOptions = new FileStreamOptions(options)

private val qualifiedBasePath: Path = {
val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unnecessary blank lines, that will be more organized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one blank line. It's actually a good idea to have blank lines separating variable definitions, as documented in a lot of different coding style guides. Excessive blank lines are bad though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's bike-shedding, but, my personal taste is to keep declaration of member fields together into a block, or if there are many, perhaps logically group them. So just really depends on whether you mean metadataLog to stand a bit separately from other fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadataLog is not logically related to qualifiedBasePath

private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = getMaxFilesPerBatch()
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you could use scalatest PrivateMethodTester to test private method, it is not necessary to expose only for testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually useful in general for debugging. I'm going to update the comment.


private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
entry.foreach(seenFiles.add)
// TODO: move purge call out of the loop once we truncate logs.
seenFiles.purge()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this purge() can be moved out of this loop and do only once, since it is time consuming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not safe until we truncate the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. let me add it to the option class.

}

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")

/**
* Returns the maximum offset that can be retrieved from the source.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)

// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles

batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}
logTrace(s"Number of new files = ${newFiles.size})")
logTrace(s"Number of files selected for batch = ${batchFiles.size}")
logTrace(s"Number of seen files = ${seenFiles.size}")
val numPurged = seenFiles.purge()

logTrace(
s"""
|Number of new files = ${newFiles.size}
|Number of files selected for batch = ${batchFiles.size}
|Number of seen files = ${seenFiles.size}
|Number of files purged from tracking map = $numPurged
""".stripMargin)

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
Expand Down Expand Up @@ -104,22 +129,26 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
val newDataSource =
DataSource(
sparkSession,
paths = files,
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = newOptions)
options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}

private def fetchAllFiles(): Seq[String] = {
/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[FileEntry] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
FileEntry(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
if (listingTimeMs > 2000) {
Expand All @@ -132,20 +161,76 @@ class FileStreamSource(
files
}

private def getMaxFilesPerBatch(): Option[Int] = {
new CaseInsensitiveMap(options)
.get("maxFilesPerTrigger")
.map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
}

override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

override def stop() {}
}


object FileStreamSource {

/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

case class FileEntry(path: String, timestamp: Timestamp) extends Serializable

/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
*
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
class SeenFilesMap(maxAgeMs: Long) {
require(maxAgeMs >= 0)

/** Mapping from file to its timestamp. */
private val map = new java.util.HashMap[String, Timestamp]

/** Timestamp of the latest file. */
private var latestTimestamp: Timestamp = 0L

/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L

/** Add a new file to the map. */
def add(file: FileEntry): Unit = {
map.put(file.path, file.timestamp)
if (file.timestamp > latestTimestamp) {
latestTimestamp = file.timestamp
}
}

/**
* Returns true if we should consider this file a new file. The file is only considered "new"
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(file: FileEntry): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
}

/** Removes aged entries and returns the number of files removed. */
def purge(): Int = {
lastPurgeTimestamp = latestTimestamp - maxAgeMs
val iter = map.entrySet().iterator()
var count = 0
while (iter.hasNext) {
val entry = iter.next()
if (entry.getValue < lastPurgeTimestamp) {
count += 1
iter.remove()
}
}
count
}

def size: Int = map.size()

def allEntries: Seq[FileEntry] = {
map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
private def isFileAlreadyExistsException(e: IOException): Boolean = {
e.isInstanceOf[FileAlreadyExistsException] ||
// Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
// HADOOP-9361, we still need to support old Hadoop versions.
// HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions.
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.SparkFunSuite

class FileStreamSourceSuite extends SparkFunSuite {

import FileStreamSource._

test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 5))
assert(map.size == 1)
map.purge()
assert(map.size == 1)

// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
map.add(FileEntry("b", 15))
assert(map.size == 2)
map.purge()
assert(map.size == 2)

// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
map.add(FileEntry("c", 16))
assert(map.size == 3)
map.purge()
assert(map.size == 2)

// Override existing entry shouldn't change the size
map.add(FileEntry("c", 25))
assert(map.size == 2)

// Not a new file because we have seen c before
assert(!map.isNewFile(FileEntry("c", 20)))

// Not a new file because timestamp is too old
assert(!map.isNewFile(FileEntry("d", 5)))

// Finally a new file: never seen and not too old
assert(map.isNewFile(FileEntry("e", 20)))
}

test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 20))
assert(map.size == 1)

// Timestamp 5 should still considered a new file because purge time should be 0
assert(map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))

// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
assert(!map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
}

}
Loading