Skip to content

Commit 80b36f3

Browse files
committed
fix compilation
1 parent 53cfb06 commit 80b36f3

File tree

3 files changed

+40
-31
lines changed

3 files changed

+40
-31
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
189189
}
190190

191191
val allPaths = (CaseInsensitiveMap(extraOptions.toMap).get("path") ++ paths).toSeq
192-
option("path", allPaths.mkString(","))
193192
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf, allPaths)
194193
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
195194
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
196195
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
197196
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
198197
ds = ds, conf = sparkSession.sessionState.conf)
198+
option("path", allPaths.mkString(","))
199199
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
200200
ds, extraOptions.toMap ++ sessionOptions,
201201
userSpecifiedSchema = userSpecifiedSchema))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
2323
import scala.language.{existentials, implicitConversions}
2424
import scala.util.{Failure, Success, Try}
2525

26+
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.fs.Path
2728

2829
import org.apache.spark.deploy.SparkHadoopUtil
@@ -538,23 +539,8 @@ case class DataSource(
538539
checkFilesExist: Boolean): Seq[Path] = {
539540
val allPaths = caseInsensitiveOptions.get("path") ++ paths
540541
val hadoopConf = sparkSession.sessionState.newHadoopConf()
541-
allPaths.flatMap { path =>
542-
val hdfsPath = new Path(path)
543-
val fs = hdfsPath.getFileSystem(hadoopConf)
544-
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
545-
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
546-
547-
if (checkEmptyGlobPath && globPath.isEmpty) {
548-
throw new AnalysisException(s"Path does not exist: $qualified")
549-
}
550-
551-
// Sufficient to check head of the globPath seq for non-glob scenario
552-
// Don't need to check once again if files exist in streaming mode
553-
if (checkFilesExist && !fs.exists(globPath.head)) {
554-
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
555-
}
556-
globPath
557-
}.toSeq
542+
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
543+
checkEmptyGlobPath, checkFilesExist)
558544
}
559545
}
560546

@@ -701,6 +687,33 @@ object DataSource extends Logging {
701687
}
702688
}
703689

690+
/**
691+
* Checks and returns files in all the paths.
692+
*/
693+
private[sql] def checkAndGlobPathIfNecessary(
694+
paths: Seq[String],
695+
hadoopConf: Configuration,
696+
checkEmptyGlobPath: Boolean,
697+
checkFilesExist: Boolean): Seq[Path] = {
698+
paths.flatMap { path =>
699+
val hdfsPath = new Path(path)
700+
val fs = hdfsPath.getFileSystem(hadoopConf)
701+
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
702+
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
703+
704+
if (checkEmptyGlobPath && globPath.isEmpty) {
705+
throw new AnalysisException(s"Path does not exist: $qualified")
706+
}
707+
708+
// Sufficient to check head of the globPath seq for non-glob scenario
709+
// Don't need to check once again if files exist in streaming mode
710+
if (checkFilesExist && !fs.exists(globPath.head)) {
711+
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
712+
}
713+
globPath
714+
}
715+
}
716+
704717
/**
705718
* When creating a data source table, the `path` option has a special meaning: the table location.
706719
* This method extracts the `path` option and treat it as table location to build a

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileSourceReader.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,27 +63,23 @@ trait FileSourceReader extends DataSourceReader
6363
protected val isCaseSensitive = sqlConf.caseSensitiveAnalysis
6464
protected val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
6565
protected val ignoreMissingFiles = sqlConf.ignoreMissingFiles
66-
private val rootPathsSpecified = {
66+
private lazy val rootPathsSpecified = {
6767
val filePath = options.get("path")
6868
if (!filePath.isPresent) {
6969
throw new AnalysisException("Reading data source requires a" +
7070
" path (e.g. data backed by a local or distributed file system).")
7171
}
72-
DataSource.checkAndGlobPathIfNecessary(hadoopConf, filePath.get, checkFilesExist = true)
73-
}
74-
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
75-
protected val partitionSchema = {
76-
val tempFileIndex = {
77-
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
78-
options.asMap().asScala.toMap, None, fileStatusCache)
79-
}
80-
PartitioningUtils.combineInferredAndUserSpecifiedPartitionSchema(
81-
tempFileIndex, userSpecifiedSchema, isCaseSensitive)
72+
DataSource.checkAndGlobPathIfNecessary(Seq(filePath.get), hadoopConf,
73+
checkEmptyGlobPath = false, checkFilesExist = false)
8274
}
8375

84-
protected val fileIndex =
76+
protected lazy val fileIndex = {
77+
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
8578
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
86-
options.asMap().asScala.toMap, Some(partitionSchema), fileStatusCache)
79+
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
80+
}
81+
82+
protected lazy val partitionSchema = fileIndex.partitionSchema
8783

8884
protected lazy val dataSchema = userSpecifiedSchema.orElse {
8985
inferSchema(fileIndex.allFiles())

0 commit comments

Comments
 (0)