Skip to content

Commit 0adc99b

Browse files
committed
Add hooks for selecting the set of files for a table scan
1 parent af1e01f commit 0adc99b

File tree

4 files changed

+42
-4
lines changed

4 files changed

+42
-4
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
150150
@transient
151151
protected[sql] lazy val substitutor = new VariableSubstitution()
152152

153+
@transient
154+
protected[sql] var hadoopFileSelector: Option[HadoopFileSelector] = None
155+
153156
/**
154157
* The copy of the hive client that is used for execution. Currently this must always be
155158
* Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
@@ -514,6 +517,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
514517
case _ => super.simpleString
515518
}
516519
}
520+
521+
def setTableNamePreprocessor(tableNamePreprocessor: (String) => String): Unit = {
522+
catalog.setTableNamePreprocessor(tableNamePreprocessor)
523+
}
524+
525+
def setHadoopFileSelector(hadoopFileSelector: Option[HadoopFileSelector]): Unit = {
526+
this.hadoopFileSelector = hadoopFileSelector
527+
}
528+
517529
}
518530

519531

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
217217
client.getTableOption(databaseName, tblName).isDefined
218218
}
219219

220+
private[this] var tableNamePreprocessor: (String) => String = identity
221+
222+
def setTableNamePreprocessor(newTableNamePreprocessor: (String) => String): Unit = {
223+
tableNamePreprocessor = newTableNamePreprocessor
224+
}
225+
220226
def lookupRelation(
221227
tableIdentifier: Seq[String],
222228
alias: Option[String]): LogicalPlan = {
223229
val tableIdent = processTableIdentifier(tableIdentifier)
224230
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
225231
client.currentDatabase)
226-
val tblName = tableIdent.last
227-
val table = client.getTable(databaseName, tblName)
232+
val rawTableName = tableIdent.last
233+
val tblName = tableNamePreprocessor(rawTableName)
234+
val table = client.getTable(databaseName, tblName).withTableName(rawTableName)
228235

229236
if (table.properties.get("spark.sql.sources.provider").isDefined) {
230237
val dataSourceTable =

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.hadoop.conf.Configuration
21-
import org.apache.hadoop.fs.{Path, PathFilter}
21+
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
2222
import org.apache.hadoop.hive.conf.HiveConf
2323
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
2424
import org.apache.hadoop.hive.ql.exec.Utilities
@@ -106,7 +106,11 @@ class HadoopTableReader(
106106
val broadcastedHiveConf = _broadcastedHiveConf
107107

108108
val tablePath = hiveTable.getPath
109-
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
109+
val fs = tablePath.getFileSystem(sc.hiveconf)
110+
val inputPathStr =
111+
sc.hadoopFileSelector.flatMap(
112+
_.selectFiles(relation.tableName, fs, tablePath)).map(_.mkString(",")).getOrElse(
113+
applyFilterIfNeeded(tablePath, filterOpt))
110114

111115
// logDebug("Table input: %s".format(tablePath))
112116
val ifc = hiveTable.getInputFormatClass
@@ -396,3 +400,16 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
396400
}
397401
}
398402
}
403+
404+
abstract class HadoopFileSelector {
405+
/**
406+
* Select files constituting a table from the given base path according to the client's custom
407+
* algorithm. This is only applied to non-partitioned tables.
408+
* @param tableName table name to select files for
409+
* @param fs the filesystem containing the table
410+
* @param basePath base path of the table in the filesystem
411+
* @return a set of files, or [[None]] if the custom file selection algorithm does not apply
412+
* to this table.
413+
*/
414+
def selectFiles(tableName: String, fs: FileSystem, basePath: Path): Option[Seq[Path]]
415+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ private[hive] case class HiveTable(
6767
this
6868
}
6969

70+
def withTableName(newName: String): HiveTable = copy(name = newName).withClient(client)
71+
7072
def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
7173

7274
def isPartitioned: Boolean = partitionColumns.nonEmpty

0 commit comments

Comments
 (0)