diff --git a/R/pkg/tests/run-all.R b/R/pkg/tests/run-all.R
index cefaadda6e21..29812f872c78 100644
--- a/R/pkg/tests/run-all.R
+++ b/R/pkg/tests/run-all.R
@@ -22,12 +22,13 @@ library(SparkR)
options("warn" = 2)
# Setup global test environment
+# Install Spark first to set SPARK_HOME
+install.spark()
+
sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
-install.spark()
-
test_package("SparkR")
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 4d0617d253b8..da954385dc45 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -299,12 +299,12 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
/applications/[app-id]/stages |
A list of all stages for a given application. |
+
?status=[active|complete|pending|failed] list only stages in the state.
/applications/[app-id]/stages/[stage-id] |
A list of all attempts for the given stage.
-
?status=[active|complete|pending|failed] list only stages in the state.
|
diff --git a/examples/src/main/r/ml/glm.R b/examples/src/main/r/ml/glm.R
index ee13910382c5..68787f9aa9dc 100644
--- a/examples/src/main/r/ml/glm.R
+++ b/examples/src/main/r/ml/glm.R
@@ -27,7 +27,7 @@ sparkR.session(appName = "SparkR-ML-glm-example")
# $example on$
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
-df_list <- randomSplit(training, c(7,3), 2)
+df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
@@ -44,8 +44,9 @@ gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
summary(gaussianGLM2)
# Fit a generalized linear model of family "binomial" with spark.glm
-training2 <- read.df("data/mllib/sample_binary_classification_data.txt", source = "libsvm")
-df_list2 <- randomSplit(training2, c(7,3), 2)
+training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
+training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
+df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")
@@ -56,6 +57,15 @@ summary(binomialGLM)
# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
head(binomialPredictions)
+
+# Fit a generalized linear model of family "tweedie" with spark.glm
+training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
+tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
+tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
+ var.power = 1.2, link.power = 0)
+
+# Model summary
+summary(tweedieGLM)
# $example off$
sparkR.session.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala
index d3c84b77d26a..e185bc8a6faa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala
@@ -38,7 +38,7 @@ object Correlation {
/**
* :: Experimental ::
- * Compute the correlation matrix for the input RDD of Vectors using the specified method.
+ * Compute the correlation matrix for the input Dataset of Vectors using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
* @param dataset A dataset or a dataframe
@@ -56,14 +56,14 @@ object Correlation {
* Here is how to access the correlation coefficient:
* {{{
* val data: Dataset[Vector] = ...
- * val Row(coeff: Matrix) = Statistics.corr(data, "value").head
+ * val Row(coeff: Matrix) = Correlation.corr(data, "value").head
* // coeff now contains the Pearson correlation matrix.
* }}}
*
* @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column
* and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
- * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to
- * avoid recomputing the common lineage.
+ * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"`
+ * to avoid recomputing the common lineage.
*/
@Since("2.2.0")
def corr(dataset: Dataset[_], column: String, method: String): DataFrame = {
diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py
index db043ff68fec..079b0833e1c6 100644
--- a/python/pyspark/ml/stat.py
+++ b/python/pyspark/ml/stat.py
@@ -71,6 +71,67 @@ def test(dataset, featuresCol, labelCol):
return _java2py(sc, javaTestObj.test(*args))
+class Correlation(object):
+ """
+ .. note:: Experimental
+
+ Compute the correlation matrix for the input dataset of Vectors using the specified method.
+ Methods currently supported: `pearson` (default), `spearman`.
+
+ .. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column
+ and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
+ which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'`
+ to avoid recomputing the common lineage.
+
+ :param dataset:
+ A dataset or a dataframe.
+ :param column:
+ The name of the column of vectors for which the correlation coefficient needs
+ to be computed. This must be a column of the dataset, and it must contain
+ Vector objects.
+ :param method:
+ String specifying the method to use for computing correlation.
+ Supported: `pearson` (default), `spearman`.
+ :return:
+ A dataframe that contains the correlation matrix of the column of vectors. This
+ dataframe contains a single row and a single column of name
+ '$METHODNAME($COLUMN)'.
+
+ >>> from pyspark.ml.linalg import Vectors
+ >>> from pyspark.ml.stat import Correlation
+ >>> dataset = [[Vectors.dense([1, 0, 0, -2])],
+ ... [Vectors.dense([4, 5, 0, 3])],
+ ... [Vectors.dense([6, 7, 0, 8])],
+ ... [Vectors.dense([9, 0, 0, 1])]]
+ >>> dataset = spark.createDataFrame(dataset, ['features'])
+ >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0]
+ >>> print(str(pearsonCorr).replace('nan', 'NaN'))
+ DenseMatrix([[ 1. , 0.0556..., NaN, 0.4004...],
+ [ 0.0556..., 1. , NaN, 0.9135...],
+ [ NaN, NaN, 1. , NaN],
+ [ 0.4004..., 0.9135..., NaN, 1. ]])
+ >>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0]
+ >>> print(str(spearmanCorr).replace('nan', 'NaN'))
+ DenseMatrix([[ 1. , 0.1054..., NaN, 0.4 ],
+ [ 0.1054..., 1. , NaN, 0.9486... ],
+ [ NaN, NaN, 1. , NaN],
+ [ 0.4 , 0.9486... , NaN, 1. ]])
+
+ .. versionadded:: 2.2.0
+
+ """
+ @staticmethod
+ @since("2.2.0")
+ def corr(dataset, column, method="pearson"):
+ """
+ Compute the correlation matrix with specified method using dataset.
+ """
+ sc = SparkContext._active_spark_context
+ javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
+ args = [_py2java(sc, arg) for arg in (dataset, column, method)]
+ return _java2py(sc, javaCorrObj.corr(*args))
+
+
if __name__ == "__main__":
import doctest
import pyspark.ml.stat
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index dc2e40424fd5..360e55d92282 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -27,7 +27,7 @@ import com.google.common.base.Objects
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -403,14 +403,14 @@ object CatalogTypes {
*/
case class CatalogRelation(
tableMeta: CatalogTable,
- dataCols: Seq[Attribute],
- partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+ dataCols: Seq[AttributeReference],
+ partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
assert(tableMeta.identifier.database.isDefined)
assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
// The partition column should always appear after data columns.
- override def output: Seq[Attribute] = dataCols ++ partitionCols
+ override def output: Seq[AttributeReference] = dataCols ++ partitionCols
def isPartitioned: Boolean = partitionCols.nonEmpty
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e685c2bed50a..640c0f189c23 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1153,7 +1153,7 @@ class SQLConf extends Serializable with Logging {
}
// For test only
- private[spark] def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
+ def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
val cloned = clone()
entries.foreach {
case (entry, value) => cloned.setConfString(entry.key, value.toString)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e5c7c383d708..2d83d512e702 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))
- LogicalRelation(
- dataSource.resolveRelation(checkFilesExist = false),
- catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}).asInstanceOf[LogicalRelation]
- // It's possible that the table schema is empty and need to be inferred at runtime. We should
- // not specify expected outputs for this case.
- val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
- plan.copy(expectedOutputAttributes = expectedOutputs)
+ if (r.output.isEmpty) {
+ // It's possible that the table schema is empty and need to be inferred at runtime. For this
+ // case, we don't need to change the output of the cached plan.
+ plan
+ } else {
+ plan.copy(output = r.output)
+ }
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index ee4d0863d977..11605dd28056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -17,12 +17,19 @@
package org.apache.spark.sql.execution.datasources
+import java.io.FileNotFoundException
+
import scala.collection.mutable
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
/**
@@ -84,4 +91,223 @@ class InMemoryFileIndex(
}
override def hashCode(): Int = rootPaths.toSet.hashCode()
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val output = mutable.LinkedHashSet[FileStatus]()
+ val pathsToFetch = mutable.ArrayBuffer[Path]()
+ for (path <- paths) {
+ fileStatusCache.getLeafFiles(path) match {
+ case Some(files) =>
+ HiveCatalogMetrics.incrementFileCacheHits(files.length)
+ output ++= files
+ case None =>
+ pathsToFetch += path
+ }
+ }
+ val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+ val discovered = InMemoryFileIndex.bulkListLeafFiles(
+ pathsToFetch, hadoopConf, filter, sparkSession)
+ discovered.foreach { case (path, leafFiles) =>
+ HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+ fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+ output ++= leafFiles
+ }
+ output
+ }
+}
+
+object InMemoryFileIndex extends Logging {
+
+ /** A serializable variant of HDFS's BlockLocation. */
+ private case class SerializableBlockLocation(
+ names: Array[String],
+ hosts: Array[String],
+ offset: Long,
+ length: Long)
+
+ /** A serializable variant of HDFS's FileStatus. */
+ private case class SerializableFileStatus(
+ path: String,
+ length: Long,
+ isDir: Boolean,
+ blockReplication: Short,
+ blockSize: Long,
+ modificationTime: Long,
+ accessTime: Long,
+ blockLocations: Array[SerializableBlockLocation])
+
+ /**
+ * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
+ * on the number of paths to list.
+ *
+ * This may only be called on the driver.
+ *
+ * @return for each input path, the set of discovered files for the path
+ */
+ private def bulkListLeafFiles(
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ filter: PathFilter,
+ sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
+
+ // Short-circuits parallel listing when serial listing is likely to be faster.
+ if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ return paths.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+ }
+ }
+
+ logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+ HiveCatalogMetrics.incrementParallelListingJobCount(1)
+
+ val sparkContext = sparkSession.sparkContext
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = paths.map(_.toString)
+ val parallelPartitionDiscoveryParallelism =
+ sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
+
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
+
+ val statusMap = sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { pathStrings =>
+ val hadoopConf = serializableConfiguration.value
+ pathStrings.map(new Path(_)).toSeq.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, None))
+ }.iterator
+ }.map { case (path, statuses) =>
+ val serializableStatuses = statuses.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
+
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }
+ (path.toString, serializableStatuses)
+ }.collect()
+
+ // turn SerializableFileStatus back to Status
+ statusMap.map { case (path, serializableStatuses) =>
+ val statuses = serializableStatuses.map { f =>
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
+ new Path(f.path)),
+ blockLocations)
+ }
+ (new Path(path), statuses)
+ }
+ }
+
+ /**
+ * Lists a single filesystem path recursively. If a SparkSession object is specified, this
+ * function may launch Spark jobs to parallelize listing.
+ *
+ * If sessionOpt is None, this may be called on executors.
+ *
+ * @return all children of path that match the specified filter.
+ */
+ private def listLeafFiles(
+ path: Path,
+ hadoopConf: Configuration,
+ filter: PathFilter,
+ sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
+ logTrace(s"Listing $path")
+ val fs = path.getFileSystem(hadoopConf)
+ val name = path.getName.toLowerCase
+
+ // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
+ // Note that statuses only include FileStatus for the files and dirs directly under path,
+ // and does not include anything else recursively.
+ val statuses = try fs.listStatus(path) catch {
+ case _: FileNotFoundException =>
+ logWarning(s"The directory $path was not found. Was it deleted very recently?")
+ Array.empty[FileStatus]
+ }
+
+ val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
+
+ val allLeafStatuses = {
+ val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
+ val nestedFiles: Seq[FileStatus] = sessionOpt match {
+ case Some(session) =>
+ bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
+ case _ =>
+ dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
+ }
+ val allFiles = topLevelFiles ++ nestedFiles
+ if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
+ }
+
+ allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+ case f: LocatedFileStatus =>
+ f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel` when the number of
+ // paths exceeds threshold.
+ case f =>
+ // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+ // which is very slow on some file system (RawLocalFileSystem, which is launch a
+ // subprocess and parse the stdout).
+ val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+ val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+ f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+ if (f.isSymlink) {
+ lfs.setSymlink(f.getSymlink)
+ }
+ lfs
+ }
+ }
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // We filter follow paths:
+ // 1. everything that starts with _ and ., except _common_metadata and _metadata
+ // because Parquet needs to find those metadata files from leaf files returned by this method.
+ // We should refactor this logic to not mix metadata files with data files.
+ // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
+ // should skip this file in case of double reading.
+ val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
+ pathName.startsWith(".") || pathName.endsWith("._COPYING_")
+ val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
+ exclude && !include
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3b14b794fd08..421520396007 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
@@ -26,31 +26,13 @@ import org.apache.spark.util.Utils
/**
* Used to link a [[BaseRelation]] in to a logical query plan.
- *
- * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
- * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
- * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
*/
case class LogicalRelation(
relation: BaseRelation,
- expectedOutputAttributes: Option[Seq[Attribute]] = None,
- catalogTable: Option[CatalogTable] = None)
+ output: Seq[AttributeReference],
+ catalogTable: Option[CatalogTable])
extends LeafNode with MultiInstanceRelation {
- override val output: Seq[AttributeReference] = {
- val attrs = relation.schema.toAttributes
- expectedOutputAttributes.map { expectedAttrs =>
- assert(expectedAttrs.length == attrs.length)
- attrs.zip(expectedAttrs).map {
- // We should respect the attribute names provided by base relation and only use the
- // exprId in `expectedOutputAttributes`.
- // The reason is that, some relations(like parquet) will reconcile attribute names to
- // workaround case insensitivity issue.
- case (attr, expected) => attr.withExprId(expected.exprId)
- }
- }.getOrElse(attrs)
- }
-
// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
@@ -87,11 +69,8 @@ case class LogicalRelation(
* unique expression ids. We respect the `expectedOutputAttributes` and create
* new instances of attributes in it.
*/
- override def newInstance(): this.type = {
- LogicalRelation(
- relation,
- expectedOutputAttributes.map(_.map(_.newInstance())),
- catalogTable).asInstanceOf[this.type]
+ override def newInstance(): LogicalRelation = {
+ this.copy(output = output.map(_.newInstance()))
}
override def refresh(): Unit = relation match {
@@ -101,3 +80,11 @@ case class LogicalRelation(
override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
+
+object LogicalRelation {
+ def apply(relation: BaseRelation): LogicalRelation =
+ LogicalRelation(relation, relation.schema.toAttributes, None)
+
+ def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
+ LogicalRelation(relation, relation.schema.toAttributes, Some(table))
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 71500a010581..ffd7f6c750f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -17,22 +17,17 @@
package org.apache.spark.sql.execution.datasources
-import java.io.FileNotFoundException
-
import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.util.SerializableConfiguration
/**
* An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
@@ -241,224 +236,8 @@ abstract class PartitioningAwareFileIndex(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
-
- /**
- * List leaf files of given paths. This method will submit a Spark job to do parallel
- * listing whenever there is a path having more files than the parallel partition discovery
- * discovery threshold.
- *
- * This is publicly visible for testing.
- */
- def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- val output = mutable.LinkedHashSet[FileStatus]()
- val pathsToFetch = mutable.ArrayBuffer[Path]()
- for (path <- paths) {
- fileStatusCache.getLeafFiles(path) match {
- case Some(files) =>
- HiveCatalogMetrics.incrementFileCacheHits(files.length)
- output ++= files
- case None =>
- pathsToFetch += path
- }
- }
- val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
- val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
- pathsToFetch, hadoopConf, filter, sparkSession)
- discovered.foreach { case (path, leafFiles) =>
- HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
- fileStatusCache.putLeafFiles(path, leafFiles.toArray)
- output ++= leafFiles
- }
- output
- }
}
-object PartitioningAwareFileIndex extends Logging {
+object PartitioningAwareFileIndex {
val BASE_PATH_PARAM = "basePath"
-
- /** A serializable variant of HDFS's BlockLocation. */
- private case class SerializableBlockLocation(
- names: Array[String],
- hosts: Array[String],
- offset: Long,
- length: Long)
-
- /** A serializable variant of HDFS's FileStatus. */
- private case class SerializableFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long,
- blockLocations: Array[SerializableBlockLocation])
-
- /**
- * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
- * on the number of paths to list.
- *
- * This may only be called on the driver.
- *
- * @return for each input path, the set of discovered files for the path
- */
- private def bulkListLeafFiles(
- paths: Seq[Path],
- hadoopConf: Configuration,
- filter: PathFilter,
- sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
-
- // Short-circuits parallel listing when serial listing is likely to be faster.
- if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- return paths.map { path =>
- (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
- }
- }
-
- logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
- HiveCatalogMetrics.incrementParallelListingJobCount(1)
-
- val sparkContext = sparkSession.sparkContext
- val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = paths.map(_.toString)
- val parallelPartitionDiscoveryParallelism =
- sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
-
- // Set the number of parallelism to prevent following file listing from generating many tasks
- // in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
-
- val statusMap = sparkContext
- .parallelize(serializedPaths, numParallelism)
- .mapPartitions { pathStrings =>
- val hadoopConf = serializableConfiguration.value
- pathStrings.map(new Path(_)).toSeq.map { path =>
- (path, listLeafFiles(path, hadoopConf, filter, None))
- }.iterator
- }.map { case (path, statuses) =>
- val serializableStatuses = statuses.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
- }
- (path.toString, serializableStatuses)
- }.collect()
-
- // turn SerializableFileStatus back to Status
- statusMap.map { case (path, serializableStatuses) =>
- val statuses = serializableStatuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
- }
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
- new Path(f.path)),
- blockLocations)
- }
- (new Path(path), statuses)
- }
- }
-
- /**
- * Lists a single filesystem path recursively. If a SparkSession object is specified, this
- * function may launch Spark jobs to parallelize listing.
- *
- * If sessionOpt is None, this may be called on executors.
- *
- * @return all children of path that match the specified filter.
- */
- private def listLeafFiles(
- path: Path,
- hadoopConf: Configuration,
- filter: PathFilter,
- sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
- logTrace(s"Listing $path")
- val fs = path.getFileSystem(hadoopConf)
- val name = path.getName.toLowerCase
-
- // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
- // Note that statuses only include FileStatus for the files and dirs directly under path,
- // and does not include anything else recursively.
- val statuses = try fs.listStatus(path) catch {
- case _: FileNotFoundException =>
- logWarning(s"The directory $path was not found. Was it deleted very recently?")
- Array.empty[FileStatus]
- }
-
- val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
-
- val allLeafStatuses = {
- val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
- val nestedFiles: Seq[FileStatus] = sessionOpt match {
- case Some(session) =>
- bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
- case _ =>
- dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
- }
- val allFiles = topLevelFiles ++ nestedFiles
- if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
- }
-
- allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
- case f: LocatedFileStatus =>
- f
-
- // NOTE:
- //
- // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
- // operations, calling `getFileBlockLocations` does no harm here since these file system
- // implementations don't actually issue RPC for this method.
- //
- // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
- // be a big deal since we always use to `listLeafFilesInParallel` when the number of
- // paths exceeds threshold.
- case f =>
- // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
- // which is very slow on some file system (RawLocalFileSystem, which is launch a
- // subprocess and parse the stdout).
- val locations = fs.getFileBlockLocations(f, 0, f.getLen)
- val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
- f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
- if (f.isSymlink) {
- lfs.setSymlink(f.getSymlink)
- }
- lfs
- }
- }
-
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // We filter follow paths:
- // 1. everything that starts with _ and ., except _common_metadata and _metadata
- // because Parquet needs to find those metadata files from leaf files returned by this method.
- // We should refactor this logic to not mix metadata files with data files.
- // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
- // should skip this file in case of double reading.
- val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
- pathName.startsWith(".") || pathName.endsWith("._COPYING_")
- val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
- exclude && !include
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8566a8061034..905b8683e10b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileIndex)(sparkSession)
- val prunedLogicalRelation = logicalRelation.copy(
- relation = prunedFsRelation,
- expectedOutputAttributes = Some(logicalRelation.output))
+ val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 7ea406492757..00f5d5db8f5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -135,15 +135,15 @@ class FileIndexSuite extends SharedSQLContext {
}
}
- test("PartitioningAwareFileIndex - file filtering") {
- assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
- assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
- assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
- assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
- assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
- assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
- assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
- assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_"))
+ test("InMemoryFileIndex - file filtering") {
+ assert(!InMemoryFileIndex.shouldFilterOut("abcd"))
+ assert(InMemoryFileIndex.shouldFilterOut(".ab"))
+ assert(InMemoryFileIndex.shouldFilterOut("_cd"))
+ assert(!InMemoryFileIndex.shouldFilterOut("_metadata"))
+ assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata"))
+ assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata"))
+ assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata"))
+ assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_"))
}
test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 60adee4599b0..6dd4847ead73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,13 +75,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')
""".stripMargin)
- assert(getPathOption("src") == Some("file:/tmp/path"))
+ assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path")))
}
// should exist even path option is not specified when creating table
withTable("src") {
sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
- assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src"))))
+ assert(getPathOption("src").map(makeQualifiedPath) == Some(defaultTablePath("src")))
}
}
@@ -95,9 +95,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|OPTIONS (PATH '$p')
|AS SELECT 1
""".stripMargin)
- assert(CatalogUtils.stringToURI(
- spark.table("src").schema.head.metadata.getString("path")) ==
- makeQualifiedPath(p.getAbsolutePath))
+ assert(
+ spark.table("src").schema.head.metadata.getString("path") ==
+ p.getAbsolutePath)
}
}
@@ -109,8 +109,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|AS SELECT 1
""".stripMargin)
- assert(spark.table("src").schema.head.metadata.getString("path") ==
- CatalogUtils.URIToString(defaultTablePath("src")))
+ assert(
+ makeQualifiedPath(spark.table("src").schema.head.metadata.getString("path")) ==
+ defaultTablePath("src"))
}
}
@@ -122,13 +123,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')""".stripMargin)
sql("ALTER TABLE src SET LOCATION '/tmp/path2'")
- assert(getPathOption("src") == Some("/tmp/path2"))
+ assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path2")))
}
withTable("src", "src2") {
sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
sql("ALTER TABLE src RENAME TO src2")
- assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2"))))
+ assert(getPathOption("src2").map(makeQualifiedPath) == Some(defaultTablePath("src2")))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 10f432570e94..6b98066cb76c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -175,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = None,
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
- val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable))
+ val created = LogicalRelation(fsRelation, updatedTable)
tableRelationCache.put(tableIdentifier, created)
created
}
@@ -203,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
- catalogTable = Some(updatedTable))
+ table = updatedTable)
tableRelationCache.put(tableIdentifier, created)
created
@@ -212,7 +212,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
}
- result.copy(expectedOutputAttributes = Some(relation.output))
+ // The inferred schema may have different filed names as the table schema, we should respect
+ // it, but also respect the exprId in table relation output.
+ assert(result.output.length == relation.output.length &&
+ result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
+ val newOutput = result.output.zip(relation.output).map {
+ case (a1, a2) => a1.withExprId(a2.exprId)
+ }
+ result.copy(output = newOutput)
}
private def inferIfNeeded(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 2b3f36064c1f..d3cbf898e243 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -329,7 +329,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
- val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ val plan = LogicalRelation(relation, tableMeta)
spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
@@ -342,7 +342,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
- val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
+ val samePlan = LogicalRelation(sameRelation, tableMeta)
assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index cd8f94b1cc4f..f818e2955546 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -58,7 +58,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
- val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ val logicalRelation = LogicalRelation(relation, tableMeta)
val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
val optimized = Optimize.execute(query)