Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class HadoopTableReader(
val broadcastedHadoopConf = _broadcastedHadoopConf

val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
val inputPathStr = tablePath.toString

// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
Expand Down Expand Up @@ -190,7 +190,7 @@ class HadoopTableReader(
.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
val inputPathStr = partPath.toString
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
// Get partition field info
Expand Down Expand Up @@ -252,20 +252,6 @@ class HadoopTableReader(
}
}

/**
* If `filterOpt` is defined, then it will be used to filter files from `path`. These files are
* returned in a single, comma-separated string.
*/
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
val fs = path.getFileSystem(hadoopConf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
}
}

/**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
* applied locally on each slave.
Expand Down