Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ abstract class ExternalCatalog {
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does isSrcLocal mean? Can you document it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the source data comes from a "LOAD DATA LOCAL" query.

I can add a partial scaladoc to these methods, but I don't really know the meaning of some of the other arguments, so I can't write a complete one.


def loadPartition(
db: String,
Expand All @@ -128,7 +129,8 @@ abstract class ExternalCatalog {
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit

def loadDynamicPartitions(
db: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ class InMemoryCatalog(
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadTable is not implemented")
}

Expand All @@ -323,7 +324,8 @@ class InMemoryCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadPartition is not implemented.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,13 @@ class SessionCatalog(
name: TableIdentifier,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal)
}

/**
Expand All @@ -330,13 +331,14 @@ class SessionCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadPartition(
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}

def defaultTablePath(tableIdent: TableIdentifier): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ case class LoadDataCommand(
throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but number of columns in provided partition spec (${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
s"(s${targetTable.partitionColumnNames.size})")
s"(${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
Expand Down Expand Up @@ -297,13 +297,15 @@ case class LoadDataCommand(
partition.get,
isOverwrite,
holdDDLTime = false,
inheritTableSpecs = true)
inheritTableSpecs = true,
isSrcLocal = isLocal)
} else {
catalog.loadTable(
targetTable.identifier,
loadPath.toString,
isOverwrite,
holdDDLTime = false)
holdDDLTime = false,
isSrcLocal = isLocal)
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = withClient {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)
client.loadTable(
loadPath,
s"$db.$table",
isOverwrite,
holdDDLTime)
holdDDLTime,
isSrcLocal)
}

override def loadPartition(
Expand All @@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = withClient {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)

val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
Expand All @@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
orderedPartitionSpec,
isOverwrite,
holdDDLTime,
inheritTableSpecs)
inheritTableSpecs,
isSrcLocal)
}

override def loadDynamicPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,16 @@ private[hive] trait HiveClient {
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit

/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit

/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ private[hive] class HiveClientImpl(
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = withHiveState {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withHiveState {
val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
shim.loadPartition(
client,
Expand All @@ -661,20 +662,23 @@ private[hive] class HiveClientImpl(
replace,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
isSrcLocal = isSrcLocal)
}

def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = withHiveState {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = withHiveState {
shim.loadTable(
client,
new Path(loadPath),
tableName,
replace,
holdDDLTime)
holdDDLTime,
isSrcLocal)
}

def loadDynamicPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ private[client] sealed abstract class Shim {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit

def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit

def loadDynamicPartitions(
hive: Hive,
Expand Down Expand Up @@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit = {
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
}
Expand All @@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
loadPath: Path,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
}

Expand Down Expand Up @@ -698,20 +702,22 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit = {
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
isSrcLocal: JBoolean, JBoolean.FALSE)
}

override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
}

override def loadDynamicPartitions(
Expand Down Expand Up @@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}

protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
val localFs = FileSystem.getLocal(conf)
val pathFs = FileSystem.get(path.toUri(), conf)
localFs.getUri() == pathFs.getUri()
}

}

private[client] class Shim_v1_0 extends Shim_v0_14 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ case class InsertIntoHiveTable(
partitionSpec,
isOverwrite = doHiveOverwrite,
holdDDLTime = holdDDLTime,
inheritTableSpecs = inheritTableSpecs)
inheritTableSpecs = inheritTableSpecs,
isSrcLocal = false)
}
}
} else {
Expand All @@ -325,7 +326,8 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
outputPath.toString, // TODO: URI
overwrite,
holdDDLTime)
holdDDLTime,
isSrcLocal = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, how can we know this is always not a local file system (e.g., as you said above, if your warehouse directory is in the local file system too)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to. "isSrcLocal" comes from the user query.

"LOAD DATA LOCAL" -> "isSrcLocal" = true
anything else -> "isSrcLocal" = false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the reason why we can set it to false. The files are created by us. We can set it to false and let Hive move it instead of copying it.

}

// Invalidate the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
emptyDir,
tableName = "src",
replace = false,
holdDDLTime = false)
holdDDLTime = false,
isSrcLocal = false)
}

test(s"$version: tableExists") {
Expand Down Expand Up @@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
partSpec,
replace = false,
holdDDLTime = false,
inheritTableSpecs = false)
inheritTableSpecs = false,
isSrcLocal = false)
}

test(s"$version: loadDynamicPartitions") {
Expand Down
Loading