@@ -110,6 +110,9 @@ case class DataSource(
110110
111111 private def providingInstance () = providingClass.getConstructor().newInstance()
112112
113+ private def newHadoopConfiguration (): Configuration =
114+ sparkSession.sessionState.newHadoopConfWithOptions(options)
115+
113116 lazy val sourceInfo : SourceInfo = sourceSchema()
114117 private val caseInsensitiveOptions = CaseInsensitiveMap (options)
115118 private val equality = sparkSession.sessionState.conf.resolver
@@ -231,7 +234,7 @@ case class DataSource(
231234 // once the streaming job starts and some upstream source starts dropping data.
232235 val hdfsPath = new Path (path)
233236 if (! SparkHadoopUtil .get.isGlobPath(hdfsPath)) {
234- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf ())
237+ val fs = hdfsPath.getFileSystem(newHadoopConfiguration ())
235238 if (! fs.exists(hdfsPath)) {
236239 throw new AnalysisException (s " Path does not exist: $path" )
237240 }
@@ -358,7 +361,7 @@ case class DataSource(
358361 case (format : FileFormat , _)
359362 if FileStreamSink .hasMetadata(
360363 caseInsensitiveOptions.get(" path" ).toSeq ++ paths,
361- sparkSession.sessionState.newHadoopConf (),
364+ newHadoopConfiguration (),
362365 sparkSession.sessionState.conf) =>
363366 val basePath = new Path ((caseInsensitiveOptions.get(" path" ).toSeq ++ paths).head)
364367 val fileCatalog = new MetadataLogFileIndex (sparkSession, basePath,
@@ -450,7 +453,7 @@ case class DataSource(
450453 val allPaths = paths ++ caseInsensitiveOptions.get(" path" )
451454 val outputPath = if (allPaths.length == 1 ) {
452455 val path = new Path (allPaths.head)
453- val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf ())
456+ val fs = path.getFileSystem(newHadoopConfiguration ())
454457 path.makeQualified(fs.getUri, fs.getWorkingDirectory)
455458 } else {
456459 throw new IllegalArgumentException (" Expected exactly one path to be specified, but " +
0 commit comments